Skip to content

Commit

Permalink
Add retry on 429 error code
Browse files Browse the repository at this point in the history
  • Loading branch information
hovaesco authored and hashhar committed Sep 2, 2024
1 parent 97c5a7f commit 9d2567b
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 3 deletions.
27 changes: 27 additions & 0 deletions tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,33 @@ def test_5XX_error_retry(status_code, attempts, monkeypatch):
assert post_retry.retry_count == attempts


def test_429_error_retry(monkeypatch):
http_resp = TrinoRequest.http.Response()
http_resp.status_code = 429
http_resp.headers["Retry-After"] = 1

post_retry = RetryRecorder(result=http_resp)
monkeypatch.setattr(TrinoRequest.http.Session, "post", post_retry)

get_retry = RetryRecorder(result=http_resp)
monkeypatch.setattr(TrinoRequest.http.Session, "get", get_retry)

req = TrinoRequest(
host="coordinator",
port=8080,
client_session=ClientSession(
user="test",
),
max_attempts=3
)

req.post("URL")
assert post_retry.retry_count == 3

req.get("URL")
assert post_retry.retry_count == 3


@pytest.mark.parametrize("status_code", [
501
])
Expand Down
32 changes: 29 additions & 3 deletions trino/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import urllib.parse
import warnings
from dataclasses import dataclass
from datetime import datetime
from email.utils import parsedate_to_datetime
from time import sleep
from typing import Any, Dict, List, Optional, Tuple, Union

Expand Down Expand Up @@ -347,6 +349,14 @@ def retry(self, func, args, kwargs, err, attempt):
sleep(delay)


class _RetryAfterSleep(object):
def __init__(self, retry_after_header):
self._retry_after_header = retry_after_header

def retry(self):
sleep(self._retry_after_header)


class TrinoRequest(object):
"""
Manage the HTTP requests of a Trino query.
Expand Down Expand Up @@ -523,9 +533,9 @@ def max_attempts(self, value) -> None:
self._handle_retry,
handled_exceptions=self._exceptions,
conditions=(
# need retry when there is no exception but the status code is 502, 503, or 504
# need retry when there is no exception but the status code is 429, 502, 503, or 504
lambda response: getattr(response, "status_code", None)
in (502, 503, 504),
in (429, 502, 503, 504),
),
max_attempts=self._max_attempts,
)
Expand Down Expand Up @@ -887,7 +897,12 @@ def decorated(*args, **kwargs):
try:
result = func(*args, **kwargs)
if any(guard(result) for guard in conditions):
handle_retry.retry(func, args, kwargs, None, attempt)
if result.status_code == 429 and "Retry-After" in result.headers:
retry_after = _parse_retry_after_header(result.headers.get("Retry-After"))
handle_retry_sleep = _RetryAfterSleep(retry_after)
handle_retry_sleep.retry()
else:
handle_retry.retry(func, args, kwargs, None, attempt)
continue
return result
except Exception as err:
Expand All @@ -904,3 +919,14 @@ def decorated(*args, **kwargs):
return decorated

return wrapper


def _parse_retry_after_header(retry_after):
if isinstance(retry_after, int):
return retry_after
elif isinstance(retry_after, str) and retry_after.isdigit():
return int(retry_after)
else:
retry_date = parsedate_to_datetime(retry_after)
now = datetime.utcnow()
return (retry_date - now).total_seconds()

0 comments on commit 9d2567b

Please sign in to comment.