DATA PARALLELISM TRAINING¶
In Common Distributed Parallel Strategies, we introduced the characteristics of data parallel.
OneFlow provides two ways to accomplish data parallel, and one of them is to use the original concept of Oneflow to run data parallel training by configurating global tensor. This is also the recommanded way to run data parallel training on Oneflow.
Besides, to facilitate the users who are transferring from PyTorch to OneFlow, OneFlow offers the interface consistent with PyTorch torch.nn.parallel.DistributedDataParallel
, oneflow.nn.parallel.DistributedDataParallel so that users can also conveniently extend single machine training to data parallel training.
Run Data Parallel Training With SBP Configuration¶
The following code runs data parallel training by configurating global tensor.
Code
import oneflow as flow
import oneflow.nn as nn
import flowvision
import flowvision.transforms as transforms
BATCH_SIZE=64
EPOCH_NUM = 1
PLACEMENT = flow.placement("cuda", [0,1])
S0 = flow.sbp.split(0)
B = flow.sbp.broadcast
DEVICE = "cuda" if flow.cuda.is_available() else "cpu"
print("Using {} device".format(DEVICE))
training_data = flowvision.datasets.CIFAR10(
root="data",
train=True,
transform=transforms.ToTensor(),
download=True,
)
train_dataloader = flow.utils.data.DataLoader(
training_data, BATCH_SIZE, shuffle=True
)
model = flowvision.models.mobilenet_v2().to(DEVICE)
model.classifer = nn.Sequential(nn.Dropout(0.2), nn.Linear(model.last_channel, 10))
model = model.to_global(placement=PLACEMENT, sbp=B)
loss_fn = nn.CrossEntropyLoss().to(DEVICE)
optimizer = flow.optim.SGD(model.parameters(), lr=1e-3)
for t in range(EPOCH_NUM):
print(f"Epoch {t+1}\n-------------------------------")
size = len(train_dataloader.dataset)
for batch, (x, y) in enumerate(train_dataloader):
x = x.to_global(placement=PLACEMENT, sbp=S0)
y = y.to_global(placement=PLACEMENT, sbp=S0)
# Compute prediction error
pred = model(x)
loss = loss_fn(pred, y)
# Backpropagation
optimizer.zero_grad()
loss.backward()
optimizer.step()
current = batch * BATCH_SIZE
if batch % 5 == 0:
print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")
We can see that this script is almost identical to the training script for the single-machine and single-device configuration, with a few exceptions on some lines related to consitent tensor set-ups. These codes are:
- Set placement to place the training one GPU 0 and 1:
PLACEMENT = flow.placement("cuda", [0,1])
- Broadcast the model on clusters:
model = model.to_global(placement=PLACEMENT, sbp=B)
- Split the data on cluster with
split(0)
:
x = x.to_global(placement=PLACEMENT, sbp=S0)
y = y.to_global(placement=PLACEMENT, sbp=S0)
This allows us to follow the instructions in Common Distributed Parallel Strategies
Run Data Parallel Training With DistributedDataParallel¶
The following codes provides a quick start for training data parallel with oneflow.nn.parallel.DistributedDataParallel
:
wget https://docs.oneflow.org/master/code/parallelism/ddp_train.py #Download
python3 -m oneflow.distributed.launch --nproc_per_node 2 ./ddp_train.py #data parallel training
Out:
50/500 loss:0.004111831542104483
50/500 loss:0.00025336415274068713
...
500/500 loss:6.184563972055912e-11
500/500 loss:4.547473508864641e-12
w:tensor([[2.0000],
[3.0000]], device='cuda:1', dtype=oneflow.float32,
grad_fn=<accumulate_grad>)
w:tensor([[2.0000],
[3.0000]], device='cuda:0', dtype=oneflow.float32,
grad_fn=<accumulate_grad>)
Click "Code" below to expand the code of the above running script.
Code
import oneflow as flow
from oneflow.nn.parallel import DistributedDataParallel as ddp
train_x = [
flow.tensor([[1, 2], [2, 3]], dtype=flow.float32),
flow.tensor([[4, 6], [3, 1]], dtype=flow.float32),
]
train_y = [
flow.tensor([[8], [13]], dtype=flow.float32),
flow.tensor([[26], [9]], dtype=flow.float32),
]
class Model(flow.nn.Module):
def __init__(self):
super().__init__()
self.lr = 0.01
self.iter_count = 500
self.w = flow.nn.Parameter(flow.tensor([[0], [0]], dtype=flow.float32))
def forward(self, x):
x = flow.matmul(x, self.w)
return x
m = Model().to("cuda")
m = ddp(m)
loss = flow.nn.MSELoss(reduction="sum")
optimizer = flow.optim.SGD(m.parameters(), m.lr)
for i in range(0, m.iter_count):
rank = flow.env.get_rank()
x = train_x[rank].to("cuda")
y = train_y[rank].to("cuda")
y_pred = m(x)
l = loss(y_pred, y)
if (i + 1) % 50 == 0:
print(f"{i+1}/{m.iter_count} loss:{l}")
optimizer.zero_grad()
l.backward()
optimizer.step()
print(f"\nw:{m.w}")
There are only two differences between the data parallelism training code and the stand-alone single-card script:
- Use
DistributedDataParallel
to wrap the module object (m = ddp(m)
) - Use get_rank to get the current device number and distribute the data to the device.
Then use launcher
to run the script, leave everything else to OneFlow, which makes distributed training as simple as stand-alone single-card training:
python3 -m oneflow.distributed.launch --nproc_per_node 2 ./ddp_train.py
DistributedSampler¶
The data used is manually distributed in this context to highlight DistributedDataParallel
. However, in practical applications, you can directly use DistributedSampler with data parallel.
DistributedSampler
will instantiate the Dataloader in each process, and each Dataloader instance will load a part of the complete data to automatically complete the data distribution.