数据并行训练¶
在 常见的分布式并行策略 一文中介绍了数据并行的特点。 在 OneFlow 中,提供了两种做数据并行的方式。
一种是使用 OneFlow 的原生的 SBP 概念,通过设置 global 张量,进行数据并行训练,这也是用 OneFlow 做数据并行训练的 推荐方式 。
此外,为了方便从 PyTorch 迁移到 OneFlow 的用户,OneFlow 提供了与 torch.nn.parallel.DistributedDataParallel
对齐一致的接口 oneflow.nn.parallel.DistributedDataParallel,它也能让用户方便地从单机训练脚本,扩展为数据并行训练。
通过设置 SBP 做数据并行训练¶
以下代码,是通过配置设置 global 张量,完成数据并行训练。点击以下 “Code” 查看详细代码。
Code
import oneflow as flow
import oneflow.nn as nn
import flowvision
import flowvision.transforms as transforms
BATCH_SIZE=64
EPOCH_NUM = 1
PLACEMENT = flow.placement("cuda", [0,1])
S0 = flow.sbp.split(0)
B = flow.sbp.broadcast
DEVICE = "cuda" if flow.cuda.is_available() else "cpu"
print("Using {} device".format(DEVICE))
training_data = flowvision.datasets.CIFAR10(
root="data",
train=True,
transform=transforms.ToTensor(),
download=True,
)
train_dataloader = flow.utils.data.DataLoader(
training_data, BATCH_SIZE, shuffle=True
)
model = flowvision.models.mobilenet_v2().to(DEVICE)
model.classifer = nn.Sequential(nn.Dropout(0.2), nn.Linear(model.last_channel, 10))
model = model.to_global(placement=PLACEMENT, sbp=B)
loss_fn = nn.CrossEntropyLoss().to(DEVICE)
optimizer = flow.optim.SGD(model.parameters(), lr=1e-3)
for t in range(EPOCH_NUM):
print(f"Epoch {t+1}\n-------------------------------")
size = len(train_dataloader.dataset)
for batch, (x, y) in enumerate(train_dataloader):
x = x.to_global(placement=PLACEMENT, sbp=S0)
y = y.to_global(placement=PLACEMENT, sbp=S0)
# Compute prediction error
pred = model(x)
loss = loss_fn(pred, y)
# Backpropagation
optimizer.zero_grad()
loss.backward()
optimizer.step()
current = batch * BATCH_SIZE
if batch % 5 == 0:
print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")
可以发现,这个脚本的与单机单卡的训练脚本几乎是一样的。少数的区别在于几行与 global tensor 有关的配置代码外,它们是:
- 设置 placement,让训练放置在集群第 0号、1号 GPU 上:
PLACEMENT = flow.placement("cuda", [0,1])
- 模型在集群上做广播
model = model.to_global(placement=PLACEMENT, sbp=B)
- 数据在集群上按
split(0)
做切分:
x = x.to_global(placement=PLACEMENT, sbp=S0)
y = y.to_global(placement=PLACEMENT, sbp=S0)
这样,按照 常见的分布式并行策略 中的介绍,我们就通过对数据进行 split(0)
切分,对模型进行广播,进行了分布式数据并行训练。
使用 DistributedDataParallel 做数据并行训练¶
可以用以下命令快速体验 oneflow.nn.parallel.DistributedDataParallel
做数据并行:
wget https://docs.oneflow.org/master/code/parallelism/ddp_train.py #下载脚本
python3 -m oneflow.distributed.launch --nproc_per_node 2 ./ddp_train.py #数据并行训练
输出:
50/500 loss:0.004111831542104483
50/500 loss:0.00025336415274068713
...
500/500 loss:6.184563972055912e-11
500/500 loss:4.547473508864641e-12
w:tensor([[2.0000],
[3.0000]], device='cuda:1', dtype=oneflow.float32,
grad_fn=<accumulate_grad>)
w:tensor([[2.0000],
[3.0000]], device='cuda:0', dtype=oneflow.float32,
grad_fn=<accumulate_grad>)
点击以下 “Code” 可以展开以上运行脚本的代码。
Code
import oneflow as flow
from oneflow.nn.parallel import DistributedDataParallel as ddp
train_x = [
flow.tensor([[1, 2], [2, 3]], dtype=flow.float32),
flow.tensor([[4, 6], [3, 1]], dtype=flow.float32),
]
train_y = [
flow.tensor([[8], [13]], dtype=flow.float32),
flow.tensor([[26], [9]], dtype=flow.float32),
]
class Model(flow.nn.Module):
def __init__(self):
super().__init__()
self.lr = 0.01
self.iter_count = 500
self.w = flow.nn.Parameter(flow.tensor([[0], [0]], dtype=flow.float32))
def forward(self, x):
x = flow.matmul(x, self.w)
return x
m = Model().to("cuda")
m = ddp(m)
loss = flow.nn.MSELoss(reduction="sum")
optimizer = flow.optim.SGD(m.parameters(), m.lr)
for i in range(0, m.iter_count):
rank = flow.env.get_rank()
x = train_x[rank].to("cuda")
y = train_y[rank].to("cuda")
y_pred = m(x)
l = loss(y_pred, y)
if (i + 1) % 50 == 0:
print(f"{i+1}/{m.iter_count} loss:{l}")
optimizer.zero_grad()
l.backward()
optimizer.step()
print(f"\nw:{m.w}")
可以发现,它与单机单卡脚本的不同只有2个:
- 使用
DistributedDataParallel
处理一下 module 对象(m = ddp(m)
) - 使用 get_rank 获取当前设备编号,并针对设备分发数据
然后使用 launcher
启动脚本,把剩下的一切都交给 OneFlow,让分布式训练,像单机单卡训练一样简单:
python3 -m oneflow.distributed.launch --nproc_per_node 2 ./ddp_train.py
DistributedSampler¶
本文为了简化问题,突出 DistributedDataParallel
,因此使用的数据是手工分发的。在实际应用中,可以直接使用 DistributedSampler 配合数据并行使用。
DistributedSampler
会在每个进程中实例化 Dataloader,每个 Dataloader 实例会加载完整数据的一部分,自动完成数据的分发。