Skip to content

Commit a729ba7

Browse files
authored
RayExecutor V2: Dynamic executor for elastic and static jobs (horovod#3230)
This resolves horovod#3190 by adding elastic params to the RayExecutor API for horovod: This API now supports both static(non-elastic) and elastic horovod jobs. Example of static job(Identical to current RayExecutor): ```python from horovod.ray import RayExecutor ray.init() hjob = RayExecutor(setting, num_workers=num_workers, use_gpu=True )) executor.start() def simple_fn(): hvd.init() print("hvd rank", hvd.rank()) return hvd.rank() result = executor.run(simple_fn) assert len(set(result)) == hosts * num_slots executor.shutdown() ``` Example of an elastic job: ```python from horovod.ray import RayExecutor import horovod.torch as hvd def training_fn(): hvd.init() model = Model() torch.cuda.set_device(hvd.local_rank()) @hvd.elastic.run def train(state): for state.epoch in range(state.epoch, epochs): ... state.commit() state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0) state.register_reset_callbacks([on_state_reset]) train(state) return executor = RayExecutor(settings, min_workers=1, use_gpu=True, cpus_per_worker=2) executor.start() executor.run(training_fn) ``` Signed-off-by: Abin Shahab <[email protected]>
1 parent 06aa579 commit a729ba7

File tree

8 files changed

+1236
-119
lines changed

8 files changed

+1236
-119
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
77
## [Unreleased] - YYYY-MM-DD
88

99
### Added
10+
- Added Elastic keyword parameters to RayExecutor API: This API supports both static(non-elastic) and elastic horovod jobs. This resolves issue:
11+
[#3190](https://github.com/horovod/horovod/issues/3190).
1012

1113
- TensorFlow: Added in-place broadcasting of variables. ([#3128](https://github.com/horovod/horovod/pull/3128))
1214

1315
### Changed
1416

1517
### Deprecated
16-
18+
- Deprecated ElasticRayExecutor APIs in favor of the new RayExecutor API for issue: [#3190](https://github.com/horovod/horovod/issues/3190).
1719
### Removed
1820

1921
### Fixed

docs/ray.rst

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ A unique feature of Ray is its support for `stateful Actors <https://docs.ray.io
110110
Elastic Ray Executor
111111
--------------------
112112

113-
Ray also supports `elastic execution <elastic.rst>`_ via :ref:`the ElasticRayExecutor <horovod_ray_api>`. Similar to default Horovod, the difference between the non-elastic and elastic versions of Ray is that the hosts and number of workers is dynamically determined at runtime.
113+
Ray also supports `elastic execution <elastic.rst>`_ via :ref:`the RayExecutor <horovod_ray_api>`. Similar to default Horovod, the difference between the non-elastic and elastic versions of Ray is that the hosts and number of workers is dynamically determined at runtime.
114114

115115
You must first set up `a Ray cluster`_. Ray clusters can support autoscaling for any cloud provider (AWS, GCP, Azure).
116116

@@ -153,10 +153,12 @@ You can then attach to the underlying Ray cluster and execute the training funct
153153
.. code-block:: python
154154
155155
import ray
156+
from horovod.ray import RayExecutor
157+
156158
ray.init(address="auto") # attach to the Ray cluster
157-
settings = ElasticRayExecutor.create_settings(verbose=True)
158-
executor = ElasticRayExecutor(
159-
settings, use_gpu=True, cpus_per_slot=2)
159+
settings = RayExecutor.create_settings(verbose=True)
160+
executor = RayExecutor(
161+
settings, min_workers=1, use_gpu=True, cpus_per_slot=2)
160162
executor.start()
161163
executor.run(training_fn)
162164

horovod/ray/adapter.py

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
from abc import ABC, abstractmethod
2+
from typing import Dict, Callable, Any, Optional, List
3+
from dataclasses import dataclass
4+
5+
@dataclass
6+
class BaseParams:
7+
cpus_per_worker: int = 1
8+
use_gpu: bool = False
9+
gpus_per_worker: Optional[int] = None
10+
def __post_init__(self):
11+
if self.gpus_per_worker and not self.use_gpu:
12+
raise ValueError("gpus_per_worker is set, but use_gpu is False. "
13+
"use_gpu must be True if gpus_per_worker is "
14+
"set. ")
15+
if self.use_gpu and isinstance(self.gpus_per_worker,
16+
int) and self.gpus_per_worker < 1:
17+
raise ValueError(
18+
f"gpus_per_worker must be >= 1: Got {self.gpus_per_worker}.")
19+
self.gpus_per_worker = self.gpus_per_worker or int(self.use_gpu)
20+
21+
22+
class Adapter(ABC):
23+
"""Adapter for executing Ray calls for various types(e.g. static and elastic)
24+
Horovod jobs.
25+
"""
26+
@abstractmethod
27+
def start(self,
28+
executable_cls: type = None,
29+
executable_args: Optional[List] = None,
30+
executable_kwargs: Optional[Dict] = None,
31+
extra_env_vars: Optional[Dict] = None):
32+
"""Starts the Adapter
33+
34+
Args:
35+
executable_cls (type): The class that will be created within
36+
an actor (BaseHorovodWorker). This will allow Horovod
37+
to establish its connections and set env vars.
38+
executable_args (List): Arguments to be passed into the
39+
worker class upon initialization.
40+
executable_kwargs (Dict): Keyword arguments to be passed into the
41+
worker class upon initialization.
42+
extra_env_vars (Dict): Environment variables to be set
43+
on the actors (worker processes) before initialization.
44+
"""
45+
raise NotImplementedError("Method must be implemented in a subclass")
46+
47+
@abstractmethod
48+
def execute(self, fn: Callable[["executable_cls"], Any],
49+
callbacks: Optional[List[Callable]] = None) -> List[Any]:
50+
"""Executes the provided function on all workers.
51+
52+
Args:
53+
fn: Target function to be invoked on every object.
54+
callbacks: List of callables. Each callback must either
55+
be a callable function or a class that implements __call__.
56+
Every callback will be invoked on every value logged
57+
by the rank 0 worker.
58+
59+
Returns:
60+
Deserialized return values from the target function.
61+
"""
62+
raise NotImplementedError("Method must be implemented in a subclass")
63+
64+
@abstractmethod
65+
def run(self,
66+
fn: Callable[[Any], Any],
67+
args: Optional[List] = None,
68+
kwargs: Optional[Dict] = None,
69+
callbacks: Optional[List[Callable]] = None) -> List[Any]:
70+
"""Executes the provided function on all workers.
71+
72+
Args:
73+
fn: Target function that can be executed with arbitrary
74+
args and keyword arguments.
75+
args: List of arguments to be passed into the target function.
76+
kwargs: Dictionary of keyword arguments to be
77+
passed into the target function.
78+
callbacks: List of callables. Each callback must either
79+
be a callable function or a class that implements __call__.
80+
Every callback will be invoked on every value logged
81+
by the rank 0 worker.
82+
83+
Returns:
84+
Deserialized return values from the target function.
85+
"""
86+
raise NotImplementedError("Method must be implemented in a subclass")
87+
88+
@abstractmethod
89+
def run_remote(self,
90+
fn: Callable[[Any], Any],
91+
args: Optional[List] = None,
92+
kwargs: Optional[Dict] = None,
93+
callbacks: Optional[List[Callable]] = None):
94+
95+
"""Executes the provided function on all workers.
96+
97+
Args:
98+
fn: Target function that can be executed with arbitrary
99+
args and keyword arguments.
100+
args: List of arguments to be passed into the target function.
101+
kwargs: Dictionary of keyword arguments to be
102+
passed into the target function.
103+
104+
Returns:
105+
list: List of ObjectRefs that you can run `ray.get` on to
106+
retrieve values.
107+
"""
108+
raise NotImplementedError("Method must be implemented in a subclass")
109+
110+
@abstractmethod
111+
def execute_single(self,
112+
fn: Callable[["executable_cls"], Any]) -> List[Any]:
113+
"""Executes the provided function on the rank 0 worker (chief).
114+
115+
Args:
116+
fn: Target function to be invoked on the chief object.
117+
118+
Returns:
119+
Deserialized return values from the target function.
120+
"""
121+
raise NotImplementedError("Method must be implemented in a subclass")
122+
123+
@abstractmethod
124+
def shutdown(self):
125+
"""Destroys the adapter."""
126+
raise NotImplementedError("Method must be implemented in a subclass")

horovod/ray/elastic.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ class ElasticRayExecutor:
179179
settings, use_gpu=True, cpus_per_slot=2)
180180
executor.start()
181181
executor.run(train_fn)
182+
warning:: .. deprecated:: 0.25.0
182183
"""
183184

184185
@staticmethod

0 commit comments

Comments
 (0)