Skip to content

Commit 8a12dc9

Browse files
Add Reducescatter op (NCCL, MPI, Gloo) (horovod#3299)
Signed-off-by: Max H. Gerlach <[email protected]> Co-authored-by: Jesse Benson (AI) <[email protected]> Co-authored-by: Jesse Benson <[email protected]>
1 parent e02bdca commit 8a12dc9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+2845
-111
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,16 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
88

99
### Added
1010

11+
- Added `hvd.reducescatter()` operation with implementations in NCCL, MPI, and Gloo. ([#3299](https://github.com/horovod/horovod/pull/3299))
12+
1113
### Changed
1214

15+
- MXNet: Updated allreduce functions to newer `op` API. ([#3299](https://github.com/horovod/horovod/pull/3299))
16+
1317
### Deprecated
1418

19+
- MXNet: Deprecated `average` argument of allreduce functions. ([#3299](https://github.com/horovod/horovod/pull/3299))
20+
1521
### Removed
1622

1723
### Fixed

CMakeLists.txt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,9 @@ set_gpu_op(HOROVOD_GPU_ALLREDUCE "MPI;NCCL;DDL")
110110
set_gpu_op(HOROVOD_GPU_ALLGATHER "MPI;NCCL")
111111
set_gpu_op(HOROVOD_GPU_BROADCAST "MPI;NCCL")
112112
set_gpu_op(HOROVOD_GPU_ALLTOALL "MPI;NCCL")
113+
set_gpu_op(HOROVOD_GPU_REDUCESCATTER "MPI;NCCL")
113114

114-
foreach(VAR in ITEMS HOROVOD_GPU_ALLREDUCE HOROVOD_GPU_ALLGATHER HOROVOD_GPU_BROADCAST HOROVOD_GPU_ALLTOALL)
115+
foreach(VAR in ITEMS HOROVOD_GPU_ALLREDUCE HOROVOD_GPU_ALLGATHER HOROVOD_GPU_BROADCAST HOROVOD_GPU_ALLTOALL HOROVOD_GPU_REDUCESCATTER)
115116
if(DEFINED ${VAR})
116117
string(SUBSTRING ${${VAR}} 0 1 ${VAR})
117118
convert_to_ascii_dec(ASCII_DEC ${${VAR}})
@@ -197,7 +198,7 @@ macro(ADD_CUDA)
197198
endif()
198199
endmacro()
199200

200-
if(DEFINED HOROVOD_GPU_ALLREDUCE OR DEFINED HOROVOD_GPU_ALLGATHER OR DEFINED HOROVOD_GPU_BROADCAST OR DEFINED HOROVOD_GPU_ALLTOALL)
201+
if(DEFINED HOROVOD_GPU_ALLREDUCE OR DEFINED HOROVOD_GPU_ALLGATHER OR DEFINED HOROVOD_GPU_BROADCAST OR DEFINED HOROVOD_GPU_ALLTOALL OR DEFINED HOROVOD_GPU_REDUCESCATTER)
201202
if(NOT DEFINED HOROVOD_GPU OR HOROVOD_GPU STREQUAL "CUDA")
202203
add_cuda()
203204
elseif(HOROVOD_GPU STREQUAL "ROCM")
@@ -215,7 +216,7 @@ if(DEFINED HOROVOD_GPU_ALLREDUCE OR DEFINED HOROVOD_GPU_ALLGATHER OR DEFINED HOR
215216
endif()
216217

217218
# NCCL
218-
if(HOROVOD_GPU_ALLREDUCE STREQUAL "N" OR HOROVOD_GPU_ALLGATHER STREQUAL "N" OR HOROVOD_GPU_BROADCAST STREQUAL "N" OR HOROVOD_GPU_ALLTOALL STREQUAL "N")
219+
if(HOROVOD_GPU_ALLREDUCE STREQUAL "N" OR HOROVOD_GPU_ALLGATHER STREQUAL "N" OR HOROVOD_GPU_BROADCAST STREQUAL "N" OR HOROVOD_GPU_ALLTOALL STREQUAL "N" OR HOROVOD_GPU_REDUCESCATTER STREQUAL "N")
219220
if(HAVE_ROCM)
220221
find_package(rccl REQUIRED)
221222
include_directories(SYSTEM ${RCCL_INCLUDE_DIRS})
@@ -256,7 +257,7 @@ if(DEFINED CCL_ROOT)
256257
endif()
257258

258259
set(HOROVOD_ALLOW_MIXED_GPU_IMPL $ENV{HOROVOD_ALLOW_MIXED_GPU_IMPL})
259-
if(HOROVOD_GPU_ALLREDUCE STREQUAL "N" AND (HOROVOD_GPU_ALLGATHER STREQUAL "M" OR HOROVOD_GPU_BROADCAST STREQUAL "M" OR HOROVOD_GPU_ALLTOALL STREQUAL "M") AND
260+
if(HOROVOD_GPU_ALLREDUCE STREQUAL "N" AND (HOROVOD_GPU_ALLGATHER STREQUAL "M" OR HOROVOD_GPU_BROADCAST STREQUAL "M" OR HOROVOD_GPU_ALLTOALL STREQUAL "M" OR HOROVOD_GPU_REDUCESCATTER STREQUAL "M") AND
260261
NOT HOROVOD_ALLOW_MIXED_GPU_IMPL STREQUAL "1")
261262
message(FATAL_ERROR "You should not mix NCCL and MPI GPU due to a possible deadlock.\n"
262263
"If you are sure you want to mix them, set the "

docs/concepts.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ a training script on 4 servers, each having 4 GPUs. If we launched one copy of t
3131
.. image:: http://mpitutorial.com/tutorials/mpi-broadcast-and-collective-communication/broadcast_pattern.png
3232
:alt: Broadcast Illustration
3333

34+
* *Reducescatter* is an operation that aggregates data among multiple processes and scatters the data across them. *Reducescatter* is used to average dense tensors then split them across processes. Here's an illustration from the `Nvidia developer guide <https://docs.nvidia.com/deeplearning/sdk/nccl-developer-guide/docs/usage/operations.html#reducescatter>`__:
35+
36+
.. image:: https://docs.nvidia.com/deeplearning/sdk/nccl-developer-guide/docs/_images/reducescatter.png
37+
:alt: Reducescatter Illustration
3438

3539
* *Alltoall* is an operation to exchange data between all processes. *Alltoall* may be useful to implement neural networks with advanced architectures that span multiple devices.
3640

docs/gpus.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ use it instead:
8282
$ HOROVOD_GPU_ALLREDUCE=MPI pip install --no-cache-dir horovod
8383
8484
85-
Additionally, if your MPI vendor's implementation supports *allgather* and *broadcast* operations on GPU, you can
85+
Additionally, if your MPI vendor's implementation supports *allgather*, *broadcast*, and *reducescatter* operations on GPU, you can
8686
configure Horovod to use them as well:
8787

8888
.. code-block:: bash

docs/install.rst

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,9 @@ Possible values are given in curly brackets: {}.
245245
* ``HOROVOD_GPU_ALLREDUCE`` - {NCCL, MPI}. Framework to use for GPU tensor allreduce.
246246
* ``HOROVOD_GPU_ALLGATHER`` - {NCCL, MPI}. Framework to use for GPU tensor allgather.
247247
* ``HOROVOD_GPU_BROADCAST`` - {NCCL, MPI}. Framework to use for GPU tensor broadcast.
248-
* ``HOROVOD_ALLOW_MIXED_GPU_IMPL`` - {1}. Allow Horovod to install with NCCL allreduce and MPI GPU allgather / broadcast. Not recommended due to a possible deadlock.
248+
* ``HOROVOD_GPU_ALLTOALL`` - {NCCL, MPI}. Framework to use for GPU tensor alltoall.
249+
* ``HOROVOD_GPU_REDUCESCATTER`` - {NCCL, MPI}. Framework to use for GPU tensor reducescatter.
250+
* ``HOROVOD_ALLOW_MIXED_GPU_IMPL`` - {1}. Allow Horovod to install with NCCL allreduce and MPI GPU allgather / broadcast / alltoall / reducescatter. Not recommended due to a possible deadlock.
249251
* ``HOROVOD_CPU_OPERATIONS`` - {MPI, GLOO, CCL}. Framework to use for CPU tensor allreduce, allgather, and broadcast.
250252
* ``HOROVOD_CMAKE`` - path to the CMake binary used to build Horovod.
251253
* ``HOROVOD_WITH_TENSORFLOW`` - {1}. Require Horovod to install with TensorFlow support enabled.

horovod/_keras/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,10 @@ def broadcast(backend, value, root_rank, name):
188188
return _eval(backend, hvd.broadcast(tf.constant(value, name=name), root_rank))
189189

190190

191+
def reducescatter(backend, value, name, op):
192+
return _eval(backend, hvd.reducescatter(tf.constant(value, name=name), op=op))
193+
194+
191195
def load_model(keras, wrap_optimizer, optimizer_modules, filepath, custom_optimizers, custom_objects):
192196
horovod_objects = {
193197
subclass.__name__.lower(): wrap_optimizer(subclass)

horovod/common/common.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ int TensorShape::dims() const {
101101

102102
int64_t TensorShape::dim_size(int idx) const {
103103
assert(idx >= 0);
104-
assert(idx < shape_.size());
104+
assert(idx < (int)shape_.size());
105105
return shape_[idx];
106106
}
107107

@@ -165,7 +165,7 @@ void parse_and_set_affinity(const char* affinity, int local_size, int local_rank
165165
auto core_id_str = strsep(&tmp, ",");
166166
errno = 0;
167167
auto core_id = std::strtol(core_id_str, &endptr, 10);
168-
if (errno == ERANGE && (core_id == LONG_MAX || core_id == LONG_MIN)
168+
if ((errno == ERANGE && (core_id == LONG_MAX || core_id == LONG_MIN))
169169
|| (errno != 0 && core_id == 0)){
170170
LOG(ERROR) << "Core ID value is invalid in " << HOROVOD_THREAD_AFFINITY
171171
<< "=" << affinity;

horovod/common/common.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ namespace common {
8383
#define MEMCPY_IN_SHARED_BUFFER "MEMCPY_IN_SHARED_BUFFER"
8484
#define MPI_ALLREDUCE "MPI_ALLREDUCE"
8585
#define MPI_ADASUM_ALLREDUCE "MPI_ADASUM_ALLREDUCE"
86+
#define MPI_REDUCESCATTER "MPI_REDUCESCATTER"
8687
#define MEMCPY_OUT_HOST_BUFFER "MEMCPY_OUT_HOST_BUFFER"
8788
#define NCCL_ALLREDUCE "NCCL_ALLREDUCE"
8889
#define MEMCPY_OUT_FUSION_BUFFER "MEMCPY_OUT_FUSION_BUFFER"
@@ -102,6 +103,7 @@ namespace common {
102103
#define GLOO_ALLREDUCE "GLOO_ALLREDUCE"
103104
#define GLOO_ALLGATHER "GLOO_ALLGATHER"
104105
#define GLOO_BCAST "GLOO_BCAST"
106+
#define GLOO_REDUCESCATTER "GLOO_REDUCESCATTER"
105107
#define HOROVOD_ELASTIC "HOROVOD_ELASTIC"
106108

107109
// Horovod knobs.

horovod/common/controller.cc

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -536,11 +536,12 @@ Response Controller::ConstructResponse(const std::string& name, int joined_size)
536536
}
537537
}
538538

539-
// If we are doing an allreduce or broadcast, check that all tensor shapes are
540-
// identical.
539+
// If we are doing an allreduce, broadcast, or reducescatter check that all
540+
// tensor shapes are identical.
541541
if (message_type == Request::ALLREDUCE ||
542542
message_type == Request::ADASUM ||
543-
message_type == Request::BROADCAST) {
543+
message_type == Request::BROADCAST ||
544+
message_type == Request::REDUCESCATTER) {
544545
TensorShape tensor_shape;
545546
for (auto dim : requests[0].tensor_shape()) {
546547
tensor_shape.AddDim(dim);
@@ -673,6 +674,19 @@ Response Controller::ConstructResponse(const std::string& name, int joined_size)
673674
}
674675
}
675676

677+
if (message_type == Request::REDUCESCATTER) {
678+
if (joined_size > 0) {
679+
error = true;
680+
error_message_stream << "Reducescatter is not supported with Join at this time.";
681+
}
682+
683+
TensorShape tensor_shape;
684+
for (auto dim : requests[0].tensor_shape()) {
685+
tensor_shape.AddDim(dim);
686+
}
687+
tensor_sizes.push_back(tensor_shape.num_elements());
688+
}
689+
676690
if (message_type == Request::ALLREDUCE || message_type == Request::ADASUM) {
677691
TensorShape tensor_shape;
678692
for (auto dim : requests[0].tensor_shape()) {
@@ -756,6 +770,12 @@ Response Controller::ConstructResponse(const std::string& name, int joined_size)
756770
response.set_response_type(Response::BROADCAST);
757771
} else if (message_type == Request::ALLTOALL) {
758772
response.set_response_type(Response::ALLTOALL);
773+
} else if (message_type == Request::REDUCESCATTER) {
774+
response.set_response_type(Response::REDUCESCATTER);
775+
for (auto dim : tensor_sizes) {
776+
response.add_tensor_size(dim);
777+
}
778+
response.set_tensor_type(data_type);
759779
} else if (message_type == Request::ADASUM) {
760780
response.set_response_type(Response::ADASUM);
761781
for (auto dim : tensor_sizes) {
@@ -815,7 +835,8 @@ void Controller::FuseResponses(std::deque<Response>& responses,
815835
responses.pop_front();
816836
int64_t tensor_size = 0;
817837
if (response.response_type() == Response::ResponseType::ALLREDUCE ||
818-
response.response_type() == Response::ResponseType::ADASUM) {
838+
response.response_type() == Response::ResponseType::ADASUM ||
839+
response.response_type() == Response::ResponseType::REDUCESCATTER) {
819840
// Attempt to add more responses to this fused response.
820841

821842
tensor_size = response.tensor_sizes()[0] * GetTypeSize(response.tensor_type());

horovod/common/message.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ const std::string& Request::RequestType_Name(RequestType value) {
102102
case RequestType::BROADCAST:
103103
static const std::string broadcast("BROADCAST");
104104
return broadcast;
105+
case RequestType::REDUCESCATTER:
106+
static const std::string reducescatter("REDUCESCATTER");
107+
return reducescatter;
105108
case RequestType::JOIN:
106109
static const std::string join("JOIN");
107110
return join;
@@ -294,6 +297,9 @@ const std::string& Response::ResponseType_Name(ResponseType value) {
294297
case ResponseType::BROADCAST:
295298
static const std::string broadcast("BROADCAST");
296299
return broadcast;
300+
case ResponseType::REDUCESCATTER:
301+
static const std::string reducescatter("REDUCESCATTER");
302+
return reducescatter;
297303
case ResponseType::JOIN:
298304
static const std::string join("JOIN");
299305
return join;

0 commit comments

Comments
 (0)