LOADING

読み込みが遅い場合はキャッシュを有効にしてください。ブラウザはデフォルトで有効になっています

DP と DDP 実践

目次

DP の実装

コード

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset

class SimpleDataset(Dataset):
    def __init__(self, data, target):
        self.data = data
        self.target = target

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.target[idx]

class SimpleModel(nn.Module):
    def __init__(self, input_dim):
        super(SimpleModel, self).__init__()
        self.fc = nn.Linear(input_dim, 1)

    def forward(self, x):
        return torch.sigmoid(self.fc(x))

n_sample = 100
n_dim = 10
batch_size = 10
X = torch.randn(n_sample, n_dim)
Y = torch.randint(0, 2, (n_sample, )).float()

dataset = SimpleDataset(X, Y)
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# ===== 注:作成されたモデルはCPU上にある ===== #
device_ids = [0, 1, 2]
model = SimpleModel(n_dim).to(device_ids[0])
model = nn.DataParallel(model, device_ids=device_ids)

optimizer = optim.SGD(model.parameters(), lr=0.01)

for epoch in range(10):
    for batch_idx, (inputs, targets) in enumerate(data_loader):
        inputs, targets = inputs.to('cuda'), targets.to('cuda')
        outputs = model(inputs)

        loss = nn.BCELoss()(outputs, targets.unsqueeze(1))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item()}')

主な手順

  • DataParallel を使う手順はただ一つ:
    model = nn.DataParallel(model, device_ids=device_ids)
    

DDP の実装

モデル並列実装手順

  • DistributedDataParallel (DDP) は、複数 GPU や複数ノードで分散学習を行うための PyTorch 公式推奨手法です。
  • モデル並列では、モデルの各層を異なる GPU に分割配置し、データを順次処理します。
  • DDP モデル並列の場合は、モデル分割・勾配同期・データ分配を自動化し、効率的な分散学習を実現します。

プロセスグループの初期化

各プロセス間の通信を設定します。

import torch.distributed as dist

def setup(rank, world_size):
    dist.init_process_group(
        "nccl",              # NCCL通信バックエンド
        rank=rank,           # 現在のプロセスID
        world_size=world_size  # 総プロセス数
    )
    torch.cuda.set_device(rank)  # GPUデバイスを設定

def cleanup():
    dist.destroy_process_group()  # 通信終了

ここで、rank は現在のプロセスの ID、world_size は総プロセス数です。
init_process_group 関数は、プロセス間の通信を初期化します。nccl は NCCL 通信バックエンドを指定します。rank は現在のプロセスの ID、world_size は総プロセス数を指定します。
torch.cuda.set_device(rank) は、現在のプロセスが使用する GPU デバイスを設定します。

モデルの定義と GPU への分割配置

モデルの各層を異なる GPU に配置します。

import torch
import torch.nn as nn

class MyModel(nn.Module):
    def __init__(self, rank):
        super(MyModel, self).__init__()
        self.part1 = nn.Linear(10, 10).to(rank*2)        # GPU0に配置
        self.part2 = nn.Linear(10, 5).to(rank*2 + 1)    # GPU1に配置

    def forward(self, x):
        x = self.part1(x)
        x = x.to(rank + 1)  # データをGPU1に転送
        return self.part2(x)

model = MyModel(rank=0)  # rank=0のGPUにモデルを初期配置

ここで、MyModel クラスの forward メソッドでデータを異なる GPU 間で転送します。

DDP でモデルをラップ

モデルを DistributedDataParallel でラップし、分散学習を有効化します。

model = nn.parallel.DistributedDataParallel(model, device_ids=[0, 1])
  • device_ids: 使用する GPU の ID を指定(例: [0, 1])。

データローダーの設定

DistributedSampler を使用してデータを均等に分配します。

from torch.utils.data import DataLoader, DistributedSampler
from torchvision.datasets import TensorDataset

# データセットの作成
dataset = TensorDataset(torch.randn(100, 10), torch.randn(100, 5))

# 分散サンプラー
sampler = DistributedSampler(dataset, num_replicas=4, rank=0)
data_loader = DataLoader(dataset, batch_size=10, sampler=sampler)

学習ループ

各プロセスが割り当てられたデータで学習を行います。

optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
criterion = nn.MSELoss()

for data, target in data_loader:
    optimizer.zero_grad()
    output = model(data.cuda(0))  # GPU0で入力データを処理
    loss = criterion(output, target.cuda(0))  # GPU0で損失計算
    loss.backward()  # 勾配計算
    optimizer.step()  # パラメータ更新

リソースの解放

学習終了時にプロセスグループを破棄します。

cleanup()

重要なポイント

項目 説明
プロセス構造 各 GPU に独立したプロセスを割り当て、GIL の影響を回避
通信方式 NCCL バックエンドを使用した AllReduce で勾配同期
モデル分割 モデルの各層を異なる GPU に手動で配置(例: to(rank)
勾配同期 DDP が自動で勾配を全プロセス間で総和(AllReduce)
データ分配 DistributedSamplerで均等にミニバッチを分割
メモリ効率 勾配バケット化により通信と計算を並列化(パイプライン処理)

注意事項

  • プロセスと GPU の対応
    各プロセスは 1 つの GPU のみを使用(例: rank=0 → GPU0)。
  • モデルの手動分割
    モデルの各層を明示的に異なる GPU に配置する必要がある。
  • 同期の保証
    DDP によりパラメータ更新が自動同期されるが、中間データの転送(例: x.to(rank+1))は手動で管理。
  • マルチノード対応
    複数ノードでの実行にはネットワーク設定(例: IP アドレス・ポート番号)が必要。

データ並列化の実装手順

  • DistributedDataParallel (DDP) は、複数 GPU や複数ノードで分散学習を行うための PyTorch 公式推奨手法です。
  • データ並列では、各プロセスが完全なモデルコピーを持ち、異なるデータサブセットで独立して学習します。
  • プロセス間で勾配を同期(AllReduce)することで、モデルパラメータを統一します。

プロセスグループの初期化

各プロセス間の通信を設定します。

import torch.distributed as dist

def setup(local_rank):
    dist.init_process_group(backend='nccl')  # NCCLバックエンドで通信を初期化
    torch.cuda.set_device(local_rank)        # 現在のプロセスが使用するGPUを設定
  • local_rank: 現在のプロセスが使用する GPU のローカル ID(例: 0, 1)。
  • backend='nccl': NVIDIA GPU 向けの高速通信ライブラリ NCCL を使用。

モデルの定義と DDP ラップ

モデルを各 GPU に配置し、DDP でラップします。

model = SimpleModel(n_dim).to(local_rank)  # モデルをGPUに配置
model = DDP(model, device_ids=[local_rank], output_device=local_rank)  # DDPでラップ
  • device_ids: 使用する GPU の ID を指定(例: [0, 1])。
  • output_device: 出力のデバイスを指定(通常、local_rankと同じ)。

データローダーの設定

DistributedSampler を使用してデータを均等に分配します。

sampler = torch.utils.data.distributed.DistributedSampler(dataset)  # 分散サンプラー
data_loader = DataLoader(dataset, batch_size=batch_size, sampler=sampler)
  • DistributedSampler の役割:
    • 各プロセスが重複しないデータサブセットを取得。
    • シャッフル時にプロセス間で同じ乱数シードを維持。

学習ループ

各プロセスが割り当てられたデータで学習を行い、勾配同期を行います。

for epoch in range(num_epochs):
    data_loader.sampler.set_epoch(epoch)  # シャッフルのシード同期
    for data, label in data_loader:
        data, label = data.to(local_rank), label.to(local_rank)
        optimizer.zero_grad()
        prediction = model(data)
        loss = loss_func(prediction, label.unsqueeze(1))
        loss.backward()  # 勾配計算(各プロセスで独立)
        optimizer.step()  # 勾配同期(AllReduce)後に更新
  • 勾配同期の仕組み:
    • 各プロセスが逆伝播で勾配を計算。
    • optimizer.step() で AllReduce により勾配総和を同期。

モデルの保存

プロセス 0 のみでモデルを保存します。

if dist.get_rank() == 0:
    torch.save(model.module.state_dict(), "model.pt")  # model.moduleでラップ解除

ここで、model.module は DDP ラップ解除後のモデルへの参照です。

DDP のデータ並列の特徴

特徴 説明
プロセス構造 各 GPU に独立したプロセスを割り当て(マルチプロセス)、GIL の影響を回避。
通信方式 NCCL バックエンドによる AllReduce で勾配総和を同期。
データ分配 DistributedSamplerでデータを均等に分割(重複なし)。
勾配同期 バケット単位の AllReduce で通信と計算を並列化(パイプライン処理)。
メモリ効率 勾配バケット化により通信と計算を最適化。
スケーラビリティ GPU 数増加時でも通信コストが固定に近い(固定オーバーヘッド)。

起動コマンド

torch.distributed.launch を使用

  • 単一ノードの場合:

    CUDA_VISIBLE_DEVICES="0,1" python -m torch.distributed.launch --nproc_per_node 2 train_ddp.py
    
  • 複数ノードの場合:

    # ノード0(ip=192.168.0.10):
    CUDA_VISIBLE_DEVICES="0,1" python -m torch.distributed.launch --nnodes=2 --node_rank=0 --master_addr="192.168.0.10" --master_port=12345 --nproc_per_node=2 --use_env train_ddp.py --batch_size=64 --lr=0.01
    
    # ノード1(ip=192.168.0.11):
    CUDA_VISIBLE_DEVICES="0,1" python -m torch.distributed.launch --nnodes=2 --node_rank=1 --master_addr="192.168.0.10" --master_port=12345 --nproc_per_node=2 --use_env train_ddp.py --batch_size=64 --lr=0.01
    

torchrun を使用

  • 単一ノードの場合:

    torchrun --nnodes=1 --nproc_per_node=2 train_ddp.py --batch_size=64 --lr=0.01
    
  • 複数ノードの場合:

    # ノード0(ip=192.168.0.10):
    torchrun --nnodes=2 --node_rank=0 --master_addr="192.168.0.10" --master_port=12345 --nproc_per_node=2 train_ddp.py --batch_size=64 --lr=0.01
    
    # ノード1(ip=192.168.0.11):
    torchrun --nnodes=2 --node_rank=1 --master_addr="192.168.0.10" --master_port=12345 --nproc_per_node=2 train_ddp.py --batch_size=64 --lr=0.01
    

オプション説明

  • --nproc_per_node: 1 ノードあたりのプロセス数。
  • --nnodes: ノード数。
  • --master_addr: マスターノードの IP アドレス。
  • --master_port: マスターノードのポート番号。
  • --use_env: 環境変数を参照してプロセスを起動する。(torchrun は必要ではない)

init process group

init process group 関数の引数

torch.distributed.init_process_group(backend=None, init_method=None, timeout=None, world_size=-1, rank=-1, store=None, group_name='', pg_options=None, device_id=None)
パラメータ名 説明
backend 通信バックエンド (nccl, gloo, mpi, ucc) を指定。GPU ならncclが推奨。
init_method プロセス間接続の初期化方法 (env://, tcp://<ip>:<port>, file://<path>, mpi://)。
timeout 通信操作のタイムアウト時間(デフォルト: NCCL は 10 分)。
world_size 参加プロセス総数(例: [4](file://d:\code\MYBLOG\themes\next\scripts\tags\group-pictures.js#L18-L23))。
rank 現在のプロセスの ID(0 からworld_size-1まで)。
store 接続情報を共有するストア(init_methodと排他使用)。
device_id 特定デバイス(例: GPU)にプロセスをバインド(バックエンド最適化用)。

init method の詳細

プロセス間の接続情報を共有する方法を指定。以下のいずれかを使用。

  1. 環境変数 (env://)

    • 主に使用される方法。
    • 環境変数 MASTER_ADDR(マスターノードの IP)と MASTER_PORT(ポート番号)を設定。
    • サンプルコード:
      import os
      os.environ['MASTER_ADDR'] = 'localhost'
      os.environ['MASTER_PORT'] = '12345'
      torch.distributed.init_process_group(backend='nccl', init_method='env://')
      
  2. TCP (tcp://<ip>:<port>)

    • マスターノードの IP とポートを直接指定。
    • サンプル:
      torch.distributed.init_process_group(
          backend='nccl',
          init_method='tcp://10.1.1.20:23456'
      )
      
  3. 共有ファイルシステム (file://<path>)

    • 共有ディレクトリ内のファイルを使用して接続情報を交換。
    • サンプル:
      torch.distributed.init_process_group(
          backend='nccl',
          init_method='file:///mnt/nfs/sharedfile'
      )
      
  4. MPI (mpi://)

    • MPI ランタイム環境を利用する(mpiバックエンド専用)。
    • サンプル:
      torch.distributed.init_process_group(
          backend='mpi',
          init_method='mpi://'
      )
      

初期化後の情報取得

初期化後、以下の関数でプロセス情報を取得可能:

  • dist.get_rank(): 現在のプロセスのランク(ID)を取得。
  • dist.get_world_size(): プロセス総数を取得。
  • dist.get_backend(): 使用中のバックエンドを取得。
  • os.getpid(): 現在のプロセスの PID を取得。

注意点

  • init_methodbackend は全プロセスで同一の設定が必要。
  • 初期化後は PyTorch 内部でプロセスグループを管理するため、明示的な返り値はなし。
  • デバッグ時はタイムアウト時間を短く設定(例: timeout=datetime.timedelta(seconds=30))。

参考

avatar
lijunjie2232
個人技術ブログ
My Github
目次0