|
| 1 | +.. inclusion-marker-start-do-not-remove |
| 2 | +
|
| 3 | +Process Sets: Concurrently Running Collective Operations |
| 4 | +======================================================== |
| 5 | + |
| 6 | +Most Horovod operations in TensorFlow, PyTorch, or MXNet feature a ``process_set`` argument: By setting up different |
| 7 | +process sets you may have multiple subsets of the world of Horovod processes run distinct collective operations in |
| 8 | +parallel. Besides Horovod's fundamental operations like ``hvd.allgather``, ``hvd.allreduce``, ``hvd.alltoall``, |
| 9 | +``hvd.broadcast``, or ``hvd.grouped_allreduce``, also many high-level utility objects such as |
| 10 | +``hvd.DistributedOptimizer`` come with support for process sets. |
| 11 | + |
| 12 | +As an example consider building a Horovod model to be trained by four worker processes with two concurrent allreduce |
| 13 | +operations on the "even" or "odd" subset. In the following we will see three ways to configure Horovod to use an even |
| 14 | +and an odd process set, offering you as much flexibility as you need. The code examples are presented for TensorFlow, |
| 15 | +but the interface for the other supported frameworks is equivalent. |
| 16 | + |
| 17 | +1) Static process sets |
| 18 | +---------------------- |
| 19 | + |
| 20 | +.. code-block:: python |
| 21 | +
|
| 22 | + # on all ranks |
| 23 | + even_set = hvd.ProcessSet([0,2]) |
| 24 | + odd_set = hvd.ProcessSet([1,3]) |
| 25 | + hvd.init(process_sets=[even_set, odd_set]) |
| 26 | +
|
| 27 | + for p in [hvd.global_process_set, even_set, odd_set]: |
| 28 | + print(p) |
| 29 | + # ProcessSet(process_set_id=0, ranks=[0, 1, 2, 3], mpi_comm=None) |
| 30 | + # ProcessSet(process_set_id=1, ranks=[0, 2], mpi_comm=None) |
| 31 | + # ProcessSet(process_set_id=2, ranks=[1, 3], mpi_comm=None) |
| 32 | +
|
| 33 | + # on ranks 0 and 2 |
| 34 | + result = hvd.allreduce(tensor_for_even_ranks, process_set=even_set) |
| 35 | +
|
| 36 | + # on ranks 1 and 3 |
| 37 | + result = hvd.allreduce(tensor_for_odd_ranks, process_set=odd_set) |
| 38 | +
|
| 39 | +Having initialized Horovod like this, the configuration of process sets cannot be changed without restarting the |
| 40 | +program. If you only use the default global process set (``hvd.global_process_set``), there is no impact on |
| 41 | +performance. |
| 42 | + |
| 43 | +2) Static process sets from MPI communicators |
| 44 | +--------------------------------------------- |
| 45 | + |
| 46 | +.. code-block:: python |
| 47 | +
|
| 48 | + # on all ranks |
| 49 | + from mpi4py import MPI |
| 50 | + comm = MPI.COMM_WORLD |
| 51 | + subcomm = MPI.COMM_WORLD.Split(color=MPI.COMM_WORLD.rank % 2, |
| 52 | + key=MPI.COMM_WORLD.rank) |
| 53 | +
|
| 54 | + split_process_set = hvd.ProcessSet(subcomm) |
| 55 | +
|
| 56 | + hvd.init(comm, process_sets=[split_process_set]) |
| 57 | +
|
| 58 | + for p in [hvd.global_process_set, split_process_set]: |
| 59 | + print(p) |
| 60 | + # ProcessSet(process_set_id=0, ranks=[0, 1, 2, 3], mpi_comm=<mpi4py.MPI.Intracomm object at 0x7fb817323dd0>) |
| 61 | + # ProcessSet(process_set_id=1, ranks=[0, 2], mpi_comm=<mpi4py.MPI.Intracomm object at 0x7fb87e2ddfb0>) |
| 62 | + ## (split_process_set differs by rank) |
| 63 | +
|
| 64 | + # on ranks 0 and 2 |
| 65 | + result = hvd.allreduce(tensor_for_even_ranks, process_set=split_process_set) |
| 66 | +
|
| 67 | + # on ranks 1 and 3 |
| 68 | + result = hvd.allreduce(tensor_for_odd_ranks, process_set=split_process_set) |
| 69 | +
|
| 70 | +If you are already using multiple MPI communicators in your distributed program, you can plug them right in. |
| 71 | + |
| 72 | +3) Dynamic process sets |
| 73 | +----------------------- |
| 74 | + |
| 75 | +.. code-block:: python |
| 76 | +
|
| 77 | + # on all ranks |
| 78 | + hvd.init(process_sets="dynamic") # alternatively set HOROVOD_DYNAMIC_PROCESS_SETS=1 |
| 79 | + even_set = hvd.add_process_set([0,2]) |
| 80 | + odd_set = hvd.add_process_set([1,3]) |
| 81 | +
|
| 82 | + for p in [hvd.global_process_set, even_set, odd_set]: |
| 83 | + print(p) |
| 84 | + # ProcessSet(process_set_id=0, ranks=[0, 1, 2, 3], mpi_comm=None) |
| 85 | + # ProcessSet(process_set_id=1, ranks=[0, 2], mpi_comm=None) |
| 86 | + # ProcessSet(process_set_id=2, ranks=[1, 3], mpi_comm=None) |
| 87 | +
|
| 88 | + # on ranks 0 and 2 |
| 89 | + result = hvd.allreduce(tensor_for_even_ranks, process_set=even_set) |
| 90 | +
|
| 91 | + # on ranks 1 and 3 |
| 92 | + result = hvd.allreduce(tensor_for_odd_ranks, process_set=odd_set) |
| 93 | +
|
| 94 | +The most flexible setup is achieved with "dynamic" process sets. Process sets can be registered and deregistered |
| 95 | +dynamically at any time after initializing Horovod via ``hvd.add_process_set()`` and ``hvd.remove_process_set()``. |
| 96 | +Calls to these functions must be made identically and in the same order by all processes. |
| 97 | + |
| 98 | +Note that dynamic process sets come with some slight extra synchronization overhead. |
| 99 | + |
| 100 | +.. inclusion-marker-end-do-not-remove |
0 commit comments