DP と DDP 実践
目次
DP の実装
コード
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
class SimpleDataset(Dataset):
def __init__(self, data, target):
self.data = data
self.target = target
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
return self.data[idx], self.target[idx]
class SimpleModel(nn.Module):
def __init__(self, input_dim):
super(SimpleModel, self).__init__()
self.fc = nn.Linear(input_dim, 1)
def forward(self, x):
return torch.sigmoid(self.fc(x))
n_sample = 100
n_dim = 10
batch_size = 10
X = torch.randn(n_sample, n_dim)
Y = torch.randint(0, 2, (n_sample, )).float()
dataset = SimpleDataset(X, Y)
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
# ===== 注:作成されたモデルはCPU上にある ===== #
device_ids = [0, 1, 2]
model = SimpleModel(n_dim).to(device_ids[0])
model = nn.DataParallel(model, device_ids=device_ids)
optimizer = optim.SGD(model.parameters(), lr=0.01)
for epoch in range(10):
for batch_idx, (inputs, targets) in enumerate(data_loader):
inputs, targets = inputs.to('cuda'), targets.to('cuda')
outputs = model(inputs)
loss = nn.BCELoss()(outputs, targets.unsqueeze(1))
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item()}')
主な手順
DataParallelを使う手順はただ一つ:model = nn.DataParallel(model, device_ids=device_ids)
DDP の実装
モデル並列実装手順
- DistributedDataParallel (DDP) は、複数 GPU や複数ノードで分散学習を行うための PyTorch 公式推奨手法です。
- モデル並列では、モデルの各層を異なる GPU に分割配置し、データを順次処理します。
- DDP モデル並列の場合は、モデル分割・勾配同期・データ分配を自動化し、効率的な分散学習を実現します。
プロセスグループの初期化
各プロセス間の通信を設定します。
import torch.distributed as dist
def setup(rank, world_size):
dist.init_process_group(
"nccl", # NCCL通信バックエンド
rank=rank, # 現在のプロセスID
world_size=world_size # 総プロセス数
)
torch.cuda.set_device(rank) # GPUデバイスを設定
def cleanup():
dist.destroy_process_group() # 通信終了
ここで、rank は現在のプロセスの ID、world_size は総プロセス数です。
init_process_group 関数は、プロセス間の通信を初期化します。nccl は NCCL 通信バックエンドを指定します。rank は現在のプロセスの ID、world_size は総プロセス数を指定します。
torch.cuda.set_device(rank) は、現在のプロセスが使用する GPU デバイスを設定します。
モデルの定義と GPU への分割配置
モデルの各層を異なる GPU に配置します。
import torch
import torch.nn as nn
class MyModel(nn.Module):
def __init__(self, rank):
super(MyModel, self).__init__()
self.part1 = nn.Linear(10, 10).to(rank*2) # GPU0に配置
self.part2 = nn.Linear(10, 5).to(rank*2 + 1) # GPU1に配置
def forward(self, x):
x = self.part1(x)
x = x.to(rank + 1) # データをGPU1に転送
return self.part2(x)
model = MyModel(rank=0) # rank=0のGPUにモデルを初期配置
ここで、MyModel クラスの forward メソッドでデータを異なる GPU 間で転送します。
DDP でモデルをラップ
モデルを DistributedDataParallel でラップし、分散学習を有効化します。
model = nn.parallel.DistributedDataParallel(model, device_ids=[0, 1])
device_ids: 使用する GPU の ID を指定(例:[0, 1])。
データローダーの設定
DistributedSampler を使用してデータを均等に分配します。
from torch.utils.data import DataLoader, DistributedSampler
from torchvision.datasets import TensorDataset
# データセットの作成
dataset = TensorDataset(torch.randn(100, 10), torch.randn(100, 5))
# 分散サンプラー
sampler = DistributedSampler(dataset, num_replicas=4, rank=0)
data_loader = DataLoader(dataset, batch_size=10, sampler=sampler)
学習ループ
各プロセスが割り当てられたデータで学習を行います。
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
criterion = nn.MSELoss()
for data, target in data_loader:
optimizer.zero_grad()
output = model(data.cuda(0)) # GPU0で入力データを処理
loss = criterion(output, target.cuda(0)) # GPU0で損失計算
loss.backward() # 勾配計算
optimizer.step() # パラメータ更新
リソースの解放
学習終了時にプロセスグループを破棄します。
cleanup()
重要なポイント
| 項目 | 説明 |
|---|---|
| プロセス構造 | 各 GPU に独立したプロセスを割り当て、GIL の影響を回避 |
| 通信方式 | NCCL バックエンドを使用した AllReduce で勾配同期 |
| モデル分割 | モデルの各層を異なる GPU に手動で配置(例: to(rank)) |
| 勾配同期 | DDP が自動で勾配を全プロセス間で総和(AllReduce) |
| データ分配 | DistributedSamplerで均等にミニバッチを分割 |
| メモリ効率 | 勾配バケット化により通信と計算を並列化(パイプライン処理) |
注意事項
- プロセスと GPU の対応
各プロセスは 1 つの GPU のみを使用(例:rank=0 → GPU0)。 - モデルの手動分割
モデルの各層を明示的に異なる GPU に配置する必要がある。 - 同期の保証
DDP によりパラメータ更新が自動同期されるが、中間データの転送(例:x.to(rank+1))は手動で管理。 - マルチノード対応
複数ノードでの実行にはネットワーク設定(例: IP アドレス・ポート番号)が必要。
データ並列化の実装手順
- DistributedDataParallel (DDP) は、複数 GPU や複数ノードで分散学習を行うための PyTorch 公式推奨手法です。
- データ並列では、各プロセスが完全なモデルコピーを持ち、異なるデータサブセットで独立して学習します。
- プロセス間で勾配を同期(AllReduce)することで、モデルパラメータを統一します。
プロセスグループの初期化
各プロセス間の通信を設定します。
import torch.distributed as dist
def setup(local_rank):
dist.init_process_group(backend='nccl') # NCCLバックエンドで通信を初期化
torch.cuda.set_device(local_rank) # 現在のプロセスが使用するGPUを設定
local_rank: 現在のプロセスが使用する GPU のローカル ID(例: 0, 1)。backend='nccl': NVIDIA GPU 向けの高速通信ライブラリ NCCL を使用。
モデルの定義と DDP ラップ
モデルを各 GPU に配置し、DDP でラップします。
model = SimpleModel(n_dim).to(local_rank) # モデルをGPUに配置
model = DDP(model, device_ids=[local_rank], output_device=local_rank) # DDPでラップ
device_ids: 使用する GPU の ID を指定(例:[0, 1])。output_device: 出力のデバイスを指定(通常、local_rankと同じ)。
データローダーの設定
DistributedSampler を使用してデータを均等に分配します。
sampler = torch.utils.data.distributed.DistributedSampler(dataset) # 分散サンプラー
data_loader = DataLoader(dataset, batch_size=batch_size, sampler=sampler)
- DistributedSampler の役割:
- 各プロセスが重複しないデータサブセットを取得。
- シャッフル時にプロセス間で同じ乱数シードを維持。
学習ループ
各プロセスが割り当てられたデータで学習を行い、勾配同期を行います。
for epoch in range(num_epochs):
data_loader.sampler.set_epoch(epoch) # シャッフルのシード同期
for data, label in data_loader:
data, label = data.to(local_rank), label.to(local_rank)
optimizer.zero_grad()
prediction = model(data)
loss = loss_func(prediction, label.unsqueeze(1))
loss.backward() # 勾配計算(各プロセスで独立)
optimizer.step() # 勾配同期(AllReduce)後に更新
- 勾配同期の仕組み:
- 各プロセスが逆伝播で勾配を計算。
optimizer.step()で AllReduce により勾配総和を同期。
モデルの保存
プロセス 0 のみでモデルを保存します。
if dist.get_rank() == 0:
torch.save(model.module.state_dict(), "model.pt") # model.moduleでラップ解除
ここで、model.module は DDP ラップ解除後のモデルへの参照です。
DDP のデータ並列の特徴
| 特徴 | 説明 |
|---|---|
| プロセス構造 | 各 GPU に独立したプロセスを割り当て(マルチプロセス)、GIL の影響を回避。 |
| 通信方式 | NCCL バックエンドによる AllReduce で勾配総和を同期。 |
| データ分配 | DistributedSamplerでデータを均等に分割(重複なし)。 |
| 勾配同期 | バケット単位の AllReduce で通信と計算を並列化(パイプライン処理)。 |
| メモリ効率 | 勾配バケット化により通信と計算を最適化。 |
| スケーラビリティ | GPU 数増加時でも通信コストが固定に近い(固定オーバーヘッド)。 |
起動コマンド
torch.distributed.launch を使用
-
単一ノードの場合:
CUDA_VISIBLE_DEVICES="0,1" python -m torch.distributed.launch --nproc_per_node 2 train_ddp.py -
複数ノードの場合:
# ノード0(ip=192.168.0.10): CUDA_VISIBLE_DEVICES="0,1" python -m torch.distributed.launch --nnodes=2 --node_rank=0 --master_addr="192.168.0.10" --master_port=12345 --nproc_per_node=2 --use_env train_ddp.py --batch_size=64 --lr=0.01 # ノード1(ip=192.168.0.11): CUDA_VISIBLE_DEVICES="0,1" python -m torch.distributed.launch --nnodes=2 --node_rank=1 --master_addr="192.168.0.10" --master_port=12345 --nproc_per_node=2 --use_env train_ddp.py --batch_size=64 --lr=0.01
torchrun を使用
-
単一ノードの場合:
torchrun --nnodes=1 --nproc_per_node=2 train_ddp.py --batch_size=64 --lr=0.01 -
複数ノードの場合:
# ノード0(ip=192.168.0.10): torchrun --nnodes=2 --node_rank=0 --master_addr="192.168.0.10" --master_port=12345 --nproc_per_node=2 train_ddp.py --batch_size=64 --lr=0.01 # ノード1(ip=192.168.0.11): torchrun --nnodes=2 --node_rank=1 --master_addr="192.168.0.10" --master_port=12345 --nproc_per_node=2 train_ddp.py --batch_size=64 --lr=0.01
オプション説明
--nproc_per_node: 1 ノードあたりのプロセス数。--nnodes: ノード数。--master_addr: マスターノードの IP アドレス。--master_port: マスターノードのポート番号。--use_env: 環境変数を参照してプロセスを起動する。(torchrun は必要ではない)
init process group
init process group 関数の引数
torch.distributed.init_process_group(backend=None, init_method=None, timeout=None, world_size=-1, rank=-1, store=None, group_name='', pg_options=None, device_id=None)
| パラメータ名 | 説明 |
|---|---|
backend |
通信バックエンド (nccl, gloo, mpi, ucc) を指定。GPU ならncclが推奨。 |
init_method |
プロセス間接続の初期化方法 (env://, tcp://<ip>:<port>, file://<path>, mpi://)。 |
timeout |
通信操作のタイムアウト時間(デフォルト: NCCL は 10 分)。 |
world_size |
参加プロセス総数(例: [4](file://d:\code\MYBLOG\themes\next\scripts\tags\group-pictures.js#L18-L23))。 |
rank |
現在のプロセスの ID(0 からworld_size-1まで)。 |
store |
接続情報を共有するストア(init_methodと排他使用)。 |
device_id |
特定デバイス(例: GPU)にプロセスをバインド(バックエンド最適化用)。 |
init method の詳細
プロセス間の接続情報を共有する方法を指定。以下のいずれかを使用。
-
環境変数 (
env://)- 主に使用される方法。
- 環境変数
MASTER_ADDR(マスターノードの IP)とMASTER_PORT(ポート番号)を設定。 - サンプルコード:
import os os.environ['MASTER_ADDR'] = 'localhost' os.environ['MASTER_PORT'] = '12345' torch.distributed.init_process_group(backend='nccl', init_method='env://')
-
TCP (
tcp://<ip>:<port>)- マスターノードの IP とポートを直接指定。
- サンプル:
torch.distributed.init_process_group( backend='nccl', init_method='tcp://10.1.1.20:23456' )
-
共有ファイルシステム (
file://<path>)- 共有ディレクトリ内のファイルを使用して接続情報を交換。
- サンプル:
torch.distributed.init_process_group( backend='nccl', init_method='file:///mnt/nfs/sharedfile' )
-
MPI (
mpi://)- MPI ランタイム環境を利用する(
mpiバックエンド専用)。 - サンプル:
torch.distributed.init_process_group( backend='mpi', init_method='mpi://' )
- MPI ランタイム環境を利用する(
初期化後の情報取得
初期化後、以下の関数でプロセス情報を取得可能:
dist.get_rank(): 現在のプロセスのランク(ID)を取得。dist.get_world_size(): プロセス総数を取得。dist.get_backend(): 使用中のバックエンドを取得。os.getpid(): 現在のプロセスの PID を取得。
注意点
init_methodとbackendは全プロセスで同一の設定が必要。- 初期化後は PyTorch 内部でプロセスグループを管理するため、明示的な返り値はなし。
- デバッグ時はタイムアウト時間を短く設定(例:
timeout=datetime.timedelta(seconds=30))。
参考
- 公式ドキュメント: PyTorch Distributed