is a part of a collection about distributed AI throughout a number of GPUs:
- Half 1: Understanding the Host and System Paradigm
- Half 2: Level-to-Level and Collective Operations (this text)
- Half 3: How GPUs Talk (coming quickly)
- Half 4: Gradient Accumulation & Distributed Knowledge Parallelism (DDP) (coming quickly)
- Half 5: ZeRO (coming quickly)
- Half 6: Tensor Parallelism (coming quickly)
Introduction
Within the earlier submit, we established the host-device paradigm and launched the idea of ranks for multi-GPU workloads. Now, we’ll discover the precise communication patterns supplied by PyTorch’s torch.distributed module to coordinate work and alternate information between these ranks. These operations, generally known as collectives, are the constructing blocks of distributed workloads.
Though PyTorch exposes these operations, it in the end calls a backend framework that really implements the communication. For NVIDIA GPUs, it’s NCCL (NVIDIA Collective Communications Library), whereas for AMD it’s RCCL (ROCm Communication Collectives Library).
NCCL implements multi-GPU and multi-node communication primitives optimized for NVIDIA GPUs and networking. It robotically detects the present topology (communication channels like PCIe, NVLink, InfiniBand) and selects probably the most environment friendly one.
Disclaimer 1: Since NVIDIA GPUs are the most typical, we’ll concentrate on the
NCCLbackend for this submit.
Disclaimer 2: For brevity, the code offered under solely offers the primary arguments of every methodology as an alternative of all accessible arguments.
Disclaimer 3: For simplicity, we’re not displaying the reminiscence deallocation of tensors, however operations like
scatteris not going to robotically free the reminiscence of the supply rank (if you happen to don’t perceive what I imply, that’s wonderful, it’ll change into clear very quickly).
Communication: Blocking vs. Non-Blocking
To work collectively, GPUs should alternate information. The CPU initiates the communication by enqueuing NCCL kernels into CUDA streams (if you happen to don’t know what CUDA Streams are, try the first weblog submit of this collection), however the precise information switch occurs instantly between GPUs over the interconnect, bypassing the CPU’s important reminiscence fully. Ideally, the GPUs are linked with a high-speed interconnect like NVLink or InfiniBand (these interconnects are coated within the third submit of this collection).
This communication could also be synchronous (blocking) or asynchronous (non-blocking), which we discover under.
Synchronous (Blocking) Communication
- Habits: Once you name a synchronous communication methodology, the host course of stops and waits till the NCCL kernel is efficiently enqueued on the present lively CUDA stream. As soon as enqueued, the operate returns. That is normally easy and dependable. Word that the host is just not ready for the switch to finish, only for the operation to be enqueued. Nevertheless, it blocks that particular stream from transferring on to the subsequent operation till the NCCL kernel is executed to completion.
Asynchronous (Non-Blocking) Communication
- Habits: Once you name an asynchronous communication methodology, the decision returns instantly, and the enqueuing operation occurs within the background. It doesn’t enqueue into the present lively stream, however fairly to a devoted inner NCCL stream per machine. This enables your CPU to proceed with different duties, a method generally known as overlapping computation with communication. The asynchronous API is extra complicated as a result of it will possibly result in undefined habits if you happen to don’t correctly use
.wait()(defined under) and modify information whereas it’s being transferred. Nevertheless, mastering it’s key to unlocking most efficiency in large-scale distributed coaching.
Level-to-Level (One-to-One)
These operations will not be thought of collectives, however they’re foundational communication primitives. They facilitate direct information switch between two particular ranks and are basic for duties the place one GPU must ship particular info to a different.
- Synchronous (Blocking): The host course of waits for the operation to be enqueued to the CUDA stream earlier than continuing. The kernel is enqueued into the present lively stream.
torch.distributed.ship(tensor, dst): Sends a tensor to a specified vacation spot rank.torch.distributed.recv(tensor, src): Receives a tensor from a supply rank. The receiving tensor have to be pre-allocated with the proper form anddtype.
- Asynchronous (Non-Blocking): The host course of initiates the enqueue operation and instantly continues with different duties. The kernel is enqueued right into a devoted inner NCCL stream per machine, which permits for overlapping communication with computation. These operations return a
request(technically aWorkobject) that can be utilized to trace the enqueuing standing.request = torch.distributed.isend(tensor, dst): Initiates an asynchronous ship operation.request = torch.distributed.irecv(tensor, src): Initiates an asynchronous obtain operation.request.wait(): Blocks the host solely till the operation has been efficiently enqueued on the CUDA stream. Nevertheless, it does block the presently lively CUDA stream from executing later kernels till this particular asynchronous operation completes.request.wait(timeout): If you happen to present a timeout argument, the host habits modifications. It should block the CPU thread till the NCCL work completes or occasions out (elevating an exception). In regular circumstances, customers don’t must set the timeout.request.is_completed(): ReturnsTrueif the operation has been efficiently enqueued onto a CUDA stream. It could be used for polling. It doesn’t assure that the precise information has been transferred.
When PyTorch launches an NCCL kernel, it robotically inserts a dependency (i.e. forces a synchronization) between your present lively stream and the NCCL stream. This implies the NCCL stream gained’t begin till all beforehand enqueued work on the lively stream finishes — guaranteeing the tensor being despatched already holds the ultimate values.
Equally, calling req.wait() inserts a dependency within the different route. Any work you enqueue on the present stream after req.wait() gained’t execute till the NCCL operation completes, so you may safely use the obtained tensors.
Main “Gotchas” in NCCL
Whereas ship and recv are labeled “synchronous,” their habits in NCCL could be complicated. A synchronous name on a CUDA tensor blocks the host CPU thread solely till the info switch kernel is enqueued to the stream, not till the info switch completes. The CPU is then free to enqueue different duties.
There may be an exception: the very first name to torch.distributed.recv() in a course of is really blocking and waits for the switch to complete, doubtless resulting from inner NCCL warm-up procedures. Subsequent calls will solely block till the operation is enqueued.
Take into account this instance the place rank 1 hangs as a result of the CPU tries to entry a tensor that the GPU has not but obtained:
rank = torch.distributed.get_rank()
if rank == 0:
t = torch.tensor([1,2,3], dtype=torch.float32, machine=machine)
# torch.distributed.ship(t, dst=1) # No ship operation is carried out
else: # rank == 1 (assuming solely 2 ranks)
t = torch.empty(3, dtype=torch.float32, machine=machine)
torch.distributed.recv(t, src=0) # Blocks solely till enqueued (after first run)
print("This WILL print if NCCL is warmed-up")
print(t) # CPU wants information from GPU, inflicting a block
print("This can NOT print")
The CPU course of at rank 1 will get caught on print(t) as a result of it triggers a host-device synchronization to entry the tensor’s information, which by no means arrives.
If you happen to run this code a number of occasions, discover that
This WILL print if NCCL is warmed-upis not going to get printed within the later executions, for the reason that CPU continues to be caught atprint(t).
Collectives
Each collective operation operate helps each sync and async operations via the async_op argument. It defaults to False, that means synchronous operations.
One-to-All Collectives
These operations contain one rank sending information to all different ranks within the group.
Broadcast
torch.distributed.broadcast(tensor, src): Copies a tensor from a single supply rank (src) to all different ranks. Each course of finally ends up with an an identical copy of the tensor. Thetensorparameter serves two functions: (1) when the rank of the method matches thesrc, thetensoris the info being despatched; (2) in any other case,tensoris used to avoid wasting the obtained information.
rank = torch.distributed.get_rank()
if rank == 0: # supply rank
tensor = torch.tensor([1,2,3], dtype=torch.int64, machine=machine)
else: # vacation spot ranks
tensor = torch.empty(3, dtype=torch.int64, machine=machine)
torch.distributed.broadcast(tensor, src=0)

Scatter
torch.distributed.scatter(tensor, scatter_list, src): Distributes chunks of information from a supply rank throughout all ranks. Thescatter_liston the supply rank comprises a number of tensors, and every rank (together with the supply) receives one tensor from this checklist into itstensorvariable. The vacation spot ranks simply crossNonefor thescatter_list.
# The scatter_list have to be None for all non-source ranks.
scatter_list = None if rank != 0 else [torch.tensor([i, i+1]).to(machine) for i in vary(0,4,2)]
tensor = torch.empty(2, dtype=torch.int64).to(machine)
torch.distributed.scatter(tensor, scatter_list, src=0)
print(f'Rank {rank} obtained: {tensor}')

All-to-One Collectives
These operations collect information from all ranks and consolidate it onto a single vacation spot rank.
Scale back
torch.distributed.cut back(tensor, dst, op): Takes a tensor from every rank, applies a discount operation (likeSUM,MAX,MIN), and shops the ultimate end result on the vacation spot rank (dst) solely.
rank = torch.distributed.get_rank()
tensor = torch.tensor([rank+1, rank+2, rank+3], machine=machine)
torch.distributed.cut back(tensor, dst=0, op=torch.distributed.ReduceOp.SUM)
print(tensor)

Collect
torch.distributed.collect(tensor, gather_list, dst): Gathers a tensor from each rank into a listing of tensors on the vacation spot rank. Thegather_listhave to be a listing of tensors (appropriately sized and typed) on the vacation spot andNoneall over the place else.
# The gather_list have to be None for all non-destination ranks.
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
gather_list = None if rank != 0 else [torch.zeros(3, dtype=torch.int64).to(device) for _ in range(world_size)]
t = torch.tensor([0+rank, 1+rank, 2+rank], dtype=torch.int64).to(machine)
torch.distributed.collect(t, gather_list, dst=0)
print(f'After op, Rank {rank} has: {gather_list}')
The variable world_size is the whole variety of ranks. It may be obtained with torch.distributed.get_world_size(). However don’t fear about implementation particulars for now, a very powerful factor is to understand the ideas.

All-to-All Collectives
In these operations, each rank each sends and receives information from all different ranks.
All Scale back
torch.distributed.all_reduce(tensor, op): Similar ascut back, however the result’s saved on eachrank as an alternative of only one vacation spot.
# Instance for torch.distributed.all_reduce
rank = torch.distributed.get_rank()
tensor = torch.tensor([rank+1, rank+2, rank+3], dtype=torch.float32, machine=machine)
torch.distributed.all_reduce(tensor, op=torch.distributed.ReduceOp.SUM)
print(f"Rank {rank} after all_reduce: {tensor}")

All Collect
torch.distributed.all_gather(tensor_list, tensor): Similar ascollect, however the gathered checklist of tensors is on the market on each rank.
# Instance for torch.distributed.all_gather
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
input_tensor = torch.tensor([rank], dtype=torch.float32, machine=machine)
tensor_list = [torch.empty(1, dtype=torch.float32, device=device) for _ in range(world_size)]
torch.distributed.all_gather(tensor_list, input_tensor)
print(f"Rank {rank} gathered: {[t.item() for t in tensor_list]}")

Scale back Scatter
torch.distributed.reduce_scatter(output, input_list): Equal of performing a cut back operation on a listing of tensors after which scattering the outcomes. Every rank receives a unique a part of the lowered output.
# Instance for torch.distributed.reduce_scatter
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
input_list = [torch.tensor([rank + i], dtype=torch.float32, machine=machine) for i in vary(world_size)]
output = torch.empty(1, dtype=torch.float32, machine=machine)
torch.distributed.reduce_scatter(output, input_list, op=torch.distributed.ReduceOp.SUM)
print(f"Rank {rank} obtained lowered worth: {output.merchandise()}")

Synchronization
The 2 most steadily used operations are request.wait() and torch.cuda.synchronize(). It’s essential to grasp the distinction between these two:
request.wait(): That is used for asynchronous operations. It synchronizes the presently lively CUDA stream for that operation, guaranteeing the stream waits for the communication to finish earlier than continuing. In different phrases, it blocks the presently lively CUDA stream till the info switch finishes. On the host facet, it solely causes the host to attend till the kernel is enqueued; the host does not anticipate the info switch to finish.torch.cuda.synchronize(): This can be a extra forceful command that pauses the host CPU thread till all beforehand enqueued duties on the GPU have completed. It ensures that the GPU is totally idle earlier than the CPU strikes on, however it will possibly create efficiency bottlenecks if used improperly. At any time when it’s good to carry out benchmark measurements, it is best to use this to make sure you seize the precise second the GPUs are accomplished.
Conclusion
Congratulations on making it to the top! On this submit, you discovered about:
- Level-to-Level Operations
- Sync and Async in NCCL
- Collective operations
- Synchronization strategies
Within the subsequent weblog submit we’ll dive into PCIe, NVLink, and different mechanisms that allow communication in a distributed setting!















