Skip to content

Commit e037932

Browse files
authored
[Providers][Slack] Add automatic 429‑retry to channel‑lookup logic (apache#51265)
* add sleep interval in operator * add retry when get conversations_list * add unit test & function description
1 parent 1088a9c commit e037932

File tree

2 files changed

+64
-3
lines changed

2 files changed

+64
-3
lines changed

providers/slack/src/airflow/providers/slack/hooks/slack.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from __future__ import annotations
1919

2020
import json
21+
import time
2122
import warnings
2223
from collections.abc import Sequence
2324
from functools import cached_property
@@ -28,7 +29,7 @@
2829
from slack_sdk.errors import SlackApiError
2930
from typing_extensions import NotRequired
3031

31-
from airflow.exceptions import AirflowNotFoundException
32+
from airflow.exceptions import AirflowException, AirflowNotFoundException
3233
from airflow.hooks.base import BaseHook
3334
from airflow.providers.slack.utils import ConnectionExtraConfig
3435
from airflow.utils.helpers import exactly_one
@@ -291,7 +292,7 @@ def get_channel_id(self, channel_name: str) -> str:
291292
"""
292293
next_cursor = None
293294
while not (channel_id := self._channels_mapping.get(channel_name)):
294-
res = self.client.conversations_list(cursor=next_cursor, types="public_channel,private_channel")
295+
res = self._call_conversations_list(cursor=next_cursor)
295296
if TYPE_CHECKING:
296297
# Slack SDK response type too broad, this should make mypy happy
297298
assert isinstance(res.data, dict)
@@ -308,6 +309,37 @@ def get_channel_id(self, channel_name: str) -> str:
308309
raise LookupError(msg)
309310
return channel_id
310311

312+
def _call_conversations_list(self, cursor: str | None = None):
313+
"""
314+
Call ``conversations.list`` with automatic 429-retry.
315+
316+
.. versionchanged:: 3.0.0
317+
Automatically retries on 429 responses (up to 5 times, honouring *Retry-After* header).
318+
319+
:param cursor: Pagination cursor returned by the previous ``conversations.list`` call.
320+
Pass ``None`` (default) to start from the first page.
321+
:raises AirflowException: If the method hits the rate-limit 5 times in a row.
322+
:raises SlackApiError: Propagated when errors other than 429 occur.
323+
:return: Slack SDK response for the page requested.
324+
"""
325+
max_retries = 5
326+
for attempt in range(max_retries):
327+
try:
328+
return self.client.conversations_list(cursor=cursor, types="public_channel,private_channel")
329+
except SlackApiError as e:
330+
if e.response.status_code == 429 and attempt < max_retries:
331+
retry_after = int(e.response.headers.get("Retry-After", 30))
332+
self.log.warning(
333+
"Rate limit hit. Retrying in %s seconds. Attempt %s/%s",
334+
retry_after,
335+
attempt + 1,
336+
max_retries,
337+
)
338+
time.sleep(retry_after)
339+
else:
340+
raise
341+
raise AirflowException("Max retries reached for conversations.list")
342+
311343
def test_connection(self):
312344
"""
313345
Tests the Slack API connection.

providers/slack/tests/unit/slack/hooks/test_slack.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
from slack_sdk.http_retry.builtin_handlers import ConnectionErrorRetryHandler, RateLimitErrorRetryHandler
2727
from slack_sdk.web.slack_response import SlackResponse
2828

29-
from airflow.exceptions import AirflowNotFoundException
29+
from airflow.exceptions import AirflowException, AirflowNotFoundException
3030
from airflow.models.connection import Connection
3131
from airflow.providers.slack.hooks.slack import SlackHook
3232

@@ -88,6 +88,13 @@ def fake_slack_response(*, data: dict | bytes, status_code: int = 200, **kwargs)
8888

8989
return SlackResponse(status_code=status_code, data=data, **kwargs)
9090

91+
@staticmethod
92+
def make_429():
93+
resp = mock.MagicMock()
94+
resp.status_code = 429
95+
resp.headers = {"Retry-After": "1"}
96+
return SlackApiError("ratelimited", response=resp)
97+
9198
@pytest.mark.parametrize(
9299
"conn_id",
93100
[
@@ -389,6 +396,28 @@ def test_get_channel_id(self, mocked_client):
389396
with pytest.raises(LookupError, match="Unable to find slack channel"):
390397
hook.get_channel_id("troubleshooting")
391398

399+
def test_call_conversations_list_retries_then_succeeds(self, monkeypatch):
400+
ok_resp = self.fake_slack_response(data={"channels": []})
401+
monkeypatch.setattr(
402+
"airflow.providers.slack.hooks.slack.WebClient",
403+
lambda **_: mock.MagicMock(
404+
conversations_list=mock.Mock(side_effect=[self.make_429(), self.make_429(), ok_resp])
405+
),
406+
)
407+
with mock.patch("time.sleep") as mocked_sleep:
408+
hook = SlackHook(slack_conn_id=SLACK_API_DEFAULT_CONN_ID)
409+
res = hook._call_conversations_list()
410+
assert res is ok_resp
411+
assert mocked_sleep.call_count == 2
412+
413+
def test_call_conversations_list_exceeds_max(self, monkeypatch):
414+
monkeypatch.setattr(
415+
"airflow.providers.slack.hooks.slack.WebClient",
416+
lambda **_: mock.MagicMock(conversations_list=mock.Mock(side_effect=[self.make_429()] * 5)),
417+
)
418+
with pytest.raises(AirflowException, match="Max retries"):
419+
SlackHook(slack_conn_id=SLACK_API_DEFAULT_CONN_ID)._call_conversations_list()
420+
392421
def test_send_file_v2(self, mocked_client):
393422
SlackHook(slack_conn_id=SLACK_API_DEFAULT_CONN_ID).send_file_v2(
394423
channel_id="C00000000", file_uploads={"file": "/foo/bar/file.txt", "filename": "foo.txt"}

0 commit comments

Comments
 (0)