Skip to content

Commit d261b5e

Browse files
authored
Support resurrecting blacklisted hosts (horovod#3319)
Signed-off-by: Abin Shahab <[email protected]>
1 parent cce4207 commit d261b5e

16 files changed

+218
-26
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
1212

1313
- TensorFlow: Added in-place broadcasting of variables. ([#3128](https://github.com/horovod/horovod/pull/3128))
1414

15+
- Added support for resurrecting blacklisted hosts. ([#3319](https://github.com/horovod/horovod/pull/3319))
16+
1517
### Changed
1618

1719
- Moved to CMake version 3.13 with first-class CUDA language support and re-enabled parallelized builds. ([#3261](https://github.com/horovod/horovod/pull/3261))

docs/elastic.rst

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,8 +332,23 @@ The maximum np can be used to cap the number of processes (to prevent over-utili
332332
as a reference point for learning rate scales and data partitions (in cases where these need to be held constant
333333
regardless of the current number of workers). If unspecified, maximum np also defaults to ``-np``.
334334

335-
Instances that fail will be added to a blacklist, as they may have faulty hardware. Ranks that fail repeatedly
336-
will result in job failure, as it may be the case that the training process cannot make progress.
335+
Instances that fail will be added to a blacklist, as they may have faulty hardware. Hosts will remain in blacklist for a configured cooldown period.
336+
After the cooldown period ends, the hosts will be whitelisted back. This is to account for transient failures, and cases where the same host
337+
is added back to a job.
338+
Cooldown periods can be configured with the ``--blacklist-cooldown-range`` parameter like this:
339+
340+
.. code-block:: bash
341+
342+
$ horovodrun -np 8 --blacklist-cooldown-range 10 100 --min-np 4 --max-np 12 --host-discovery-script discover_hosts.py python train.py
343+
344+
The above example configures the minimum cooldown period to 10 seconds and the maximum cooldown period to 100 seconds.
345+
The intial cooldown period would be 10 seconds. For repeat failures the cooldown period would grow with an exponential
346+
backoff delay (with a constant exponent of 2): 10s, 20s, 40s, and so on. However, the maximum cooldown period would be
347+
capped at 100 seconds, regardless of failure count. A random backoff fraction of the cooldown lower limit is added
348+
to the cooldown delay.
349+
The default behavior is to have no cooldown period, and blacklisted hosts would remain in blacklist.
350+
351+
Ranks that fail repeatedly will result in job failure, as it may be the case that the training process cannot make progress.
337352

338353

339354
Running on Ray

horovod/ray/elastic_v2.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Callable, List, Any, Dict, Optional
1+
from typing import Callable, List, Any, Dict, Optional, Tuple
22
import logging
33
import ray.exceptions
44
import socket
@@ -162,6 +162,11 @@ class ElasticParams(BaseParams):
162162
reset_limit (int): Maximum number of times that the training
163163
job can scale up or down the number of workers after
164164
which the job is terminated.
165+
cooldown_range(Tuple[int, int]): Range(in seconds) a failing
166+
host will remain in blacklist.
167+
Example: cooldown_range=(10, 100)
168+
This sets the minimum cooldown period to 10 seconds,
169+
and the maximum cooldown period to 100 seconds.
165170
elastic_timeout (int): Timeout for elastic initialisation after
166171
re-scaling the cluster. The default value is 600 seconds.
167172
Alternatively, the environment variable
@@ -177,6 +182,7 @@ class ElasticParams(BaseParams):
177182
min_workers: int = 1
178183
max_workers: int = None
179184
reset_limit: int = None
185+
cooldown_range: Optional[Tuple[int, int]] = None
180186
elastic_timeout: int = 600
181187
override_discovery: bool = True
182188

@@ -205,6 +211,11 @@ class ElasticAdapter(Adapter):
205211
reset_limit (int): Maximum number of times that the training
206212
job can scale up or down the number of workers after
207213
which the job is terminated.
214+
cooldown_range (Tuple[int, int]): Range(in seconds) a failing
215+
host will remain in blacklist.
216+
Example: cooldown_range=(10, 100)
217+
This sets the minimum cooldown period to 10 seconds,
218+
and the maximum cooldown period to 100 seconds.
208219
elastic_timeout (int): Timeout for elastic initialisation after
209220
re-scaling the cluster. The default value is 600 seconds.
210221
Alternatively, the environment variable
@@ -228,6 +239,7 @@ def __init__(self,
228239
gpus_per_worker: Optional[int] = None,
229240
override_discovery: bool=True,
230241
reset_limit: int = None,
242+
cooldown_range: Optional[Tuple[int, int]] = None,
231243
elastic_timeout: int = 600):
232244
self.settings = settings
233245
if override_discovery:
@@ -243,6 +255,7 @@ def __init__(self,
243255
self.max_workers = max_workers
244256
self.num_workers = min_workers
245257
self.reset_limit = reset_limit
258+
self.cooldown_range = cooldown_range
246259
self.elastic_timeout = elastic_timeout
247260
self.driver = None
248261
self.rendezvous = None
@@ -275,6 +288,7 @@ def start(self,
275288
max_np=self.max_workers,
276289
timeout=self.elastic_timeout,
277290
reset_limit=self.reset_limit,
291+
cooldown_range=self.cooldown_range,
278292
verbose=self.settings.verbose)
279293
handler = create_rendezvous_handler(self.driver)
280294
logger.debug("[ray] starting rendezvous")

horovod/ray/runner.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,7 @@ def __init__(
253253
min_workers: int = None,
254254
max_workers: int = None,
255255
reset_limit: int = None,
256+
cooldown_range: List[int] = None,
256257
elastic_timeout: int = 600,
257258
override_discovery: bool = True
258259
):
@@ -268,6 +269,7 @@ def __init__(
268269
min_workers=min_workers,
269270
max_workers=max_workers,
270271
reset_limit=reset_limit,
272+
cooldown_range=cooldown_range,
271273
elastic_timeout=elastic_timeout,
272274
override_discovery=override_discovery,
273275
cpus_per_worker=cpus_per_worker,

horovod/runner/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ def __init__(self):
5454
self.slots = None
5555
self.elastic_timeout = None
5656
self.reset_limit = None
57+
self.cooldown_range = None
5758

5859
# timeline arguments
5960
self.timeline_filename = None
@@ -98,6 +99,7 @@ def run(
9899
max_np=None,
99100
slots=None,
100101
reset_limit=None,
102+
cooldown_range=None,
101103
hosts=None,
102104
hostfile=None,
103105
start_timeout=None,
@@ -133,6 +135,7 @@ def run(
133135
job after the initial registration. So a reset_limit of 0 would mean the job cannot change
134136
membership after its initial set of workers. A reset_limit of 1 means it can resize at most
135137
once, etc.
138+
:param cooldown_range: Range of seconds(min, max) a failing host will remain in blacklist.
136139
137140
:param hosts: List of host names and the number of available slots
138141
for running processes on each, of the form: <hostname>:<slots>
@@ -192,6 +195,7 @@ def wrapped_func():
192195
hargs.max_np = max_np
193196
hargs.slots = slots
194197
hargs.reset_limit = reset_limit
198+
hargs.cooldown_range = cooldown_range
195199
hargs.hosts = hosts
196200
hargs.hostfile = hostfile
197201
hargs.start_timeout = start_timeout

horovod/runner/elastic/discovery.py

Lines changed: 90 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,48 @@
1515

1616
import io
1717
import logging
18+
import random
1819
import threading
19-
20+
import time
2021
from collections import defaultdict
2122

2223
from horovod.runner.common.util import safe_shell_exec
2324
from horovod.runner.elastic.worker import HostUpdateResult
2425

26+
# The default lower bound for cooldown period. If a range is provided,
27+
# the provided lower limit must be at or above this lower bound
28+
DEFAULT_COOLDOWN_LOWER_LIMIT_SECONDS = 1
29+
# The default upper bound for cooldown period. If a range is provided,
30+
# the provided upper limit must be at or below this upper bound
31+
DEFAULT_COOLDOWN_UPPER_LIMIT_SECONDS = 1 * 60 * 60
2532

2633
class HostState(object):
27-
def __init__(self):
34+
35+
def __init__(self, cooldown_range=None):
2836
self._event = threading.Event()
2937

30-
# TODO(travis): blacklisted hosts should have a timeout period that increases with each failure
3138
self._blacklisted = False
39+
self._blacklist_count = 0
40+
if cooldown_range:
41+
HostState._validate_cooldown_range(cooldown_range)
42+
self._cooldown_lower_limit, self._cooldown_upper_limit = cooldown_range
43+
else:
44+
self._cooldown_lower_limit = -1
45+
self._cooldown_upper_limit = -1
46+
self._cooldown_period_end_ts = 0
47+
48+
@staticmethod
49+
def _validate_cooldown_range(cooldown_range):
50+
cooldown_lower_limit, cooldown_upper_limit = cooldown_range
51+
52+
if (cooldown_lower_limit < DEFAULT_COOLDOWN_LOWER_LIMIT_SECONDS):
53+
raise ValueError(f"Provided cooldown lower limit: {cooldown_lower_limit} \
54+
cannot be lower than default cooldown lower limit: {DEFAULT_COOLDOWN_LOWER_LIMIT_SECONDS}")
55+
56+
57+
if (cooldown_upper_limit > DEFAULT_COOLDOWN_UPPER_LIMIT_SECONDS):
58+
raise ValueError(f"Provided cooldown upper limit: {cooldown_upper_limit} \
59+
cannot be higher than default cooldown upper limit: {DEFAULT_COOLDOWN_UPPER_LIMIT_SECONDS}")
3260

3361
def get_event(self):
3462
if self._event.is_set():
@@ -39,13 +67,48 @@ def get_event(self):
3967
def set_event(self):
4068
self._event.set()
4169

70+
def _in_cooldown_period(self, current_time):
71+
return self._cooldown_period_end_ts > current_time
72+
73+
74+
def _set_cooldown_period(self, current_time):
75+
if self._cooldown_lower_limit == -1 or self._cooldown_upper_limit == -1:
76+
return
77+
self._blacklist_count += 1
78+
79+
cooldown_delay = self._cooldown_lower_limit * (1 << self._blacklist_count) + (random.uniform(0,1) * self._cooldown_lower_limit)
80+
logging.debug(f"{self._blacklist_count}:{self._cooldown_period_end_ts} cooldown_delay: {cooldown_delay}")
81+
# We need to ensure that the cooldown upper limit is the upper bound of the delay
82+
cooldown_delta_seconds = max(self._cooldown_lower_limit, min(self._cooldown_upper_limit, cooldown_delay))
83+
84+
self._cooldown_period_end_ts = current_time + cooldown_delta_seconds
85+
logging.debug(f"cooldown delta seconds: {cooldown_delta_seconds}")
86+
4287
def blacklist(self):
88+
"""Moves this host to a blacklist, and starts the cooldown period."""
4389
self._blacklisted = True
90+
now = time.time()
91+
if self._in_cooldown_period(now):
92+
return
93+
self._set_cooldown_period(now)
4494
self.set_event()
4595

96+
def whitelist(self):
97+
"""Ends the cooldown period and moves this host out of blacklist."""
98+
self._cooldown_period_end_ts = 0
99+
self._blacklisted = False
100+
46101
def is_blacklisted(self):
102+
"""Checks if the host is in the blacklist."""
47103
return self._blacklisted
48104

105+
def is_resurrected(self):
106+
"""Checks if host is in an expired cooldown period."""
107+
if self._cooldown_period_end_ts > 0:
108+
return not self._in_cooldown_period(time.time())
109+
return False
110+
111+
49112

50113
class DiscoveredHosts(object):
51114
def __init__(self, host_slots, host_assignment_order):
@@ -76,15 +139,17 @@ def update(self, hosts_state):
76139
if not hosts_state[host].is_blacklisted()]
77140
return self
78141

142+
def __str__(self):
143+
return f"slots: {self._host_slots} order: {self._host_assignment_order}"
144+
79145

80146
class HostManager(object):
81-
def __init__(self, discovery):
147+
def __init__(self, discovery, cooldown_range=None):
82148
self._current_hosts = DiscoveredHosts(host_slots={}, host_assignment_order=[])
83-
self._hosts_state = defaultdict(HostState)
149+
self._hosts_state = defaultdict(lambda: HostState(cooldown_range))
84150
self._discovery = discovery
85151

86152
def update_available_hosts(self):
87-
# TODO(travis): also check for hosts removed from the blacklist in the future
88153
def check_update(cur_host_slots, prev_host_slots):
89154
res = HostUpdateResult.no_update
90155

@@ -103,17 +168,32 @@ def check_update(cur_host_slots, prev_host_slots):
103168
elif cur_host_slots[h] < prev_host_slots[h]:
104169
# h has removed some slots
105170
res |= HostUpdateResult.removed
171+
elif self._hosts_state[h].is_resurrected():
172+
res |= HostUpdateResult.added
106173
return res
107174

108175
prev_host_slots = self._current_hosts.host_slots
109176
prev_host_assignment_order = self._current_hosts.host_assignment_order
110177
host_slots = self._discovery.find_available_hosts_and_slots()
111-
if prev_host_slots != host_slots:
112-
available_hosts = set([host for host in host_slots.keys() if not self._hosts_state[host].is_blacklisted()])
178+
179+
def whitelist_all_hosts():
180+
for host in host_slots.keys():
181+
if self._hosts_state[host].is_resurrected():
182+
self._hosts_state[host].whitelist()
183+
184+
def has_resurrected_hosts():
185+
resurrected_hosts = [host for host in host_slots.keys() if self._hosts_state[host].is_resurrected()]
186+
return len(resurrected_hosts) > 0
187+
188+
if prev_host_slots != host_slots or has_resurrected_hosts():
189+
available_hosts = set([host for host in host_slots.keys() \
190+
if not (self._hosts_state[host].is_blacklisted() and not self._hosts_state[host].is_resurrected())])
113191
host_assignment_order = HostManager.order_available_hosts(available_hosts, prev_host_assignment_order)
114192
self._current_hosts = DiscoveredHosts(host_slots=host_slots,
115193
host_assignment_order=host_assignment_order)
116-
return check_update(self._current_hosts.host_slots, prev_host_slots)
194+
host_update_state = check_update(self._current_hosts.host_slots, prev_host_slots)
195+
whitelist_all_hosts()
196+
return host_update_state
117197
else:
118198
return HostUpdateResult.no_update
119199

@@ -123,7 +203,7 @@ def current_hosts(self):
123203

124204
def blacklist(self, host):
125205
if not self._hosts_state[host].is_blacklisted():
126-
logging.warning('blacklist failing host: {}'.format(host))
206+
logging.info('blacklist failing host: {}'.format(host))
127207
self._hosts_state[host].blacklist()
128208

129209
def is_blacklisted(self, host):

horovod/runner/elastic/driver.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ def get_results(self):
6666

6767

6868
class ElasticDriver(object):
69-
def __init__(self, rendezvous, discovery, min_np, max_np, timeout=None, reset_limit=None, verbose=0):
69+
def __init__(self, rendezvous, discovery, min_np, max_np, timeout=None, reset_limit=None, cooldown_range=None, verbose=0):
7070
self._rendezvous = rendezvous
71-
self._host_manager = HostManager(discovery)
71+
self._host_manager = HostManager(discovery, cooldown_range)
7272
self._min_np = min_np
7373
self._max_np = max_np
7474
self._verbose = verbose

horovod/runner/elastic/settings.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818

1919
class ElasticSettings(BaseSettings):
20-
def __init__(self, discovery, min_np, max_np, elastic_timeout, reset_limit, **kwargs):
20+
def __init__(self, discovery, min_np, max_np, elastic_timeout, reset_limit, cooldown_range=None, **kwargs):
2121
"""
2222
:param discovery: object used to detect and manage available hosts
2323
:type discovery: horovod.runner.elastic.discovery.HostDiscovery
@@ -29,13 +29,16 @@ def __init__(self, discovery, min_np, max_np, elastic_timeout, reset_limit, **kw
2929
:type elastic_timeout: int
3030
:param reset_limit: maximum number of resets after which the job is terminated
3131
:type reset_limit: int
32+
:param cooldown_range: maximum number of resets after which the job is terminated
33+
:type cooldown_range: int
3234
"""
3335
super(ElasticSettings, self).__init__(elastic=True, **kwargs)
3436
self.discovery = discovery
3537
self.min_np = min_np
3638
self.max_np = max_np
3739
self.elastic_timeout = elastic_timeout
3840
self.reset_limit = reset_limit
41+
self.cooldown_range=cooldown_range
3942

4043
# we do not serialize the discovery instance
4144
# it is not needed on the worker and might not be serializable

horovod/runner/gloo_run.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ def launch_gloo_elastic(command, exec_command, settings, env, get_common_interfa
307307
settings.min_np, settings.max_np,
308308
timeout=settings.elastic_timeout,
309309
reset_limit=settings.reset_limit,
310+
cooldown_range=settings.cooldown_range,
310311
verbose=settings.verbose)
311312

312313
handler = create_rendezvous_handler(driver)

0 commit comments

Comments
 (0)