Skip to content

Commit 4f1392d

Browse files
committed
sequential, threaded & async HTTP clients using HTTPX
1 parent 7985fda commit 4f1392d

12 files changed

+273
-226
lines changed

20-futures/getflags/README.adoc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ Contents:
1414
* <<macos_certificates>>
1515
1616
[[server_setup]]
17-
== Setting up a test server
17+
== Setting up test servers
1818

1919
If you don't already have a local HTTP server for testing,
2020
here are the steps to experiment with the `flags2*` examples
@@ -25,7 +25,7 @@ using just the Python ≥ 3.9 distribution:
2525
. Unzip the _flags.zip_ file, creating a _flags_ directory at _20-futures/getflags/flags/_.
2626
. Open a second shell, go to the _20-futures/getflags/_ directory and run `python3 -m http.server`. This will start a `ThreadingHTTPServer` listening to port 8000, serving the local files. If you open the URL http://localhost:8000/flags/[http://localhost:8000/flags/] with your browser, you'll see a long list of directories named with two-letter country codes from `ad/` to `zw/`.
2727
. Now you can go back to the first shell and run the _flags2*.py_ examples with the default `--server LOCAL` option.
28-
. To test with the `--server DELAY` option, go to _20-futures/getflags/_ and run `python3 slow_server.py`. This binds to port 8001 by default. It will add a .5s delay before each response.
28+
. To test with the `--server DELAY` option, go to _20-futures/getflags/_ and run `python3 slow_server.py`. This binds to port 8001 by default. It will add a random delay of .5s to 5s before each response.
2929
. To test with the `--server ERROR` option, go to _20-futures/getflags/_ and run `python3 slow_server.py 8002 --error-rate .25`.
3030
Each request will have a 25% probability of getting a
3131
https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/418[418 I'm a teapot] response,
@@ -86,7 +86,7 @@ optional arguments:
8686
All arguments are optional. The most important arguments are discussed next.
8787

8888
One option you can't ignore is `-s/--server`: it lets you choose which HTTP server and base URL will be used in the test.
89-
You can pass one of four strings to determine where the script will look for the flags (the strings are case insensitive):
89+
You can pass one of four labels to determine where the script will look for the flags (the labels are case-insensitive):
9090

9191
`LOCAL`:: Use `http://localhost:8000/flags`; this is the default.
9292
You should configure a local HTTP server to answer at port 8000. See <<server_setup>> for instructions.

20-futures/getflags/flags.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,35 +21,38 @@
2121
from pathlib import Path
2222
from typing import Callable
2323

24-
import requests # <1>
24+
import httpx # <1>
2525

2626
POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
2727
'MX PH VN ET EG DE IR TR CD FR').split() # <2>
2828

29-
BASE_URL = 'http://fluentpython.com/data/flags' # <3>
29+
BASE_URL = 'https://www.fluentpython.com/data/flags' # <3>
3030
DEST_DIR = Path('downloaded') # <4>
3131

3232
def save_flag(img: bytes, filename: str) -> None: # <5>
3333
(DEST_DIR / filename).write_bytes(img)
3434

3535
def get_flag(cc: str) -> bytes: # <6>
3636
url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
37-
resp = requests.get(url)
37+
resp = httpx.get(url, timeout=6.1, # <7>
38+
follow_redirects=True) # <8>
39+
resp.raise_for_status() # <9>
3840
return resp.content
3941

40-
def download_many(cc_list: list[str]) -> int: # <7>
41-
for cc in sorted(cc_list): # <8>
42+
def download_many(cc_list: list[str]) -> int: # <10>
43+
for cc in sorted(cc_list): # <11>
4244
image = get_flag(cc)
4345
save_flag(image, f'{cc}.gif')
44-
print(cc, end=' ', flush=True) # <9>
46+
print(cc, end=' ', flush=True) # <12>
4547
return len(cc_list)
4648

47-
def main(downloader: Callable[[list[str]], int]) -> None: # <10>
48-
t0 = time.perf_counter() # <11>
49+
def main(downloader: Callable[[list[str]], int]) -> None: # <13>
50+
DEST_DIR.mkdir(exist_ok=True) # <14>
51+
t0 = time.perf_counter() # <15>
4952
count = downloader(POP20_CC)
5053
elapsed = time.perf_counter() - t0
5154
print(f'\n{count} downloads in {elapsed:.2f}s')
5255

5356
if __name__ == '__main__':
54-
main(download_many) # <12>
57+
main(download_many) # <16>
5558
# end::FLAGS_PY[]

20-futures/getflags/flags2_asyncio.py

Lines changed: 46 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -8,91 +8,94 @@
88
# tag::FLAGS2_ASYNCIO_TOP[]
99
import asyncio
1010
from collections import Counter
11+
from http import HTTPStatus
12+
from pathlib import Path
1113

12-
import aiohttp
14+
import httpx
1315
import tqdm # type: ignore
1416

15-
from flags2_common import main, HTTPStatus, Result, save_flag
17+
from flags2_common import main, DownloadStatus, save_flag
1618

1719
# default set low to avoid errors from remote site, such as
1820
# 503 - Service Temporarily Unavailable
1921
DEFAULT_CONCUR_REQ = 5
2022
MAX_CONCUR_REQ = 1000
2123

22-
23-
class FetchError(Exception): # <1>
24-
def __init__(self, country_code: str):
25-
self.country_code = country_code
26-
27-
28-
async def get_flag(session: aiohttp.ClientSession, # <2>
24+
async def get_flag(session: httpx.AsyncClient, # <2>
2925
base_url: str,
3026
cc: str) -> bytes:
3127
url = f'{base_url}/{cc}/{cc}.gif'.lower()
32-
async with session.get(url) as resp:
33-
if resp.status == 200:
34-
return await resp.read()
35-
else:
36-
resp.raise_for_status() # <3>
37-
return bytes()
28+
resp = await session.get(url, timeout=3.1, follow_redirects=True) # <3>
29+
resp.raise_for_status()
30+
return resp.content
3831

39-
async def download_one(session: aiohttp.ClientSession,
32+
async def download_one(session: httpx.AsyncClient,
4033
cc: str,
4134
base_url: str,
4235
semaphore: asyncio.Semaphore, # <4>
43-
verbose: bool) -> Result:
36+
verbose: bool) -> DownloadStatus:
4437
try:
4538
async with semaphore: # <5>
4639
image = await get_flag(session, base_url, cc)
47-
except aiohttp.ClientResponseError as exc:
48-
if exc.status == 404: # <6>
49-
status = HTTPStatus.not_found
50-
msg = 'not found'
40+
except httpx.HTTPStatusError as exc: # <4>
41+
res = exc.response
42+
if res.status_code == HTTPStatus.NOT_FOUND:
43+
status = DownloadStatus.NOT_FOUND # <5>
44+
msg = f'not found: {res.url}'
5145
else:
52-
raise FetchError(cc) from exc # <7>
46+
raise
47+
5348
else:
54-
save_flag(image, f'{cc}.gif')
55-
status = HTTPStatus.ok
49+
await asyncio.to_thread(save_flag, image, f'{cc}.gif')
50+
status = DownloadStatus.OK
5651
msg = 'OK'
5752
if verbose and msg:
5853
print(cc, msg)
59-
return Result(status, cc)
54+
return status
6055
# end::FLAGS2_ASYNCIO_TOP[]
6156

6257
# tag::FLAGS2_ASYNCIO_START[]
6358
async def supervisor(cc_list: list[str],
6459
base_url: str,
6560
verbose: bool,
66-
concur_req: int) -> Counter[HTTPStatus]: # <1>
67-
counter: Counter[HTTPStatus] = Counter()
61+
concur_req: int) -> Counter[DownloadStatus]: # <1>
62+
counter: Counter[DownloadStatus] = Counter()
6863
semaphore = asyncio.Semaphore(concur_req) # <2>
69-
async with aiohttp.ClientSession() as session:
64+
async with httpx.AsyncClient() as session:
7065
to_do = [download_one(session, cc, base_url, semaphore, verbose)
7166
for cc in sorted(cc_list)] # <3>
7267
to_do_iter = asyncio.as_completed(to_do) # <4>
7368
if not verbose:
7469
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) # <5>
7570
for coro in to_do_iter: # <6>
7671
try:
77-
res = await coro # <7>
78-
except FetchError as exc: # <8>
79-
country_code = exc.country_code # <9>
80-
try:
81-
error_msg = exc.__cause__.message # type: ignore # <10>
82-
except AttributeError:
83-
error_msg = 'Unknown cause' # <11>
84-
if verbose and error_msg:
85-
print(f'*** Error for {country_code}: {error_msg}')
86-
status = HTTPStatus.error
87-
else:
88-
status = res.status
89-
counter[status] += 1 # <12>
90-
return counter # <13>
72+
status = await coro # <7>
73+
except httpx.HTTPStatusError as exc: # <8>
74+
error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
75+
error_msg = error_msg.format(resp=exc.response)
76+
error = exc
77+
except httpx.RequestError as exc: # <9>
78+
error_msg = f'{exc} {type(exc)}'.strip()
79+
error = exc
80+
except KeyboardInterrupt: # <10>
81+
break
82+
else: # <11>
83+
error = None
84+
85+
if error:
86+
status = DownloadStatus.ERROR # <12>
87+
if verbose:
88+
url = str(error.request.url) # <13>
89+
cc = Path(url).stem.upper() # <14>
90+
print(f'{cc} error: {error_msg}')
91+
counter[status] += 1
92+
93+
return counter
9194

9295
def download_many(cc_list: list[str],
9396
base_url: str,
9497
verbose: bool,
95-
concur_req: int) -> Counter[HTTPStatus]:
98+
concur_req: int) -> Counter[DownloadStatus]:
9699
coro = supervisor(cc_list, base_url, verbose, concur_req)
97100
counts = asyncio.run(coro) # <14>
98101

20-futures/getflags/flags2_asyncio_executor.py

Lines changed: 57 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -2,108 +2,107 @@
22

33
"""Download flags of countries (with error handling).
44
5-
asyncio async/await version using run_in_executor for save_flag.
5+
asyncio async/await version
66
77
"""
8-
8+
# tag::FLAGS2_ASYNCIO_TOP[]
99
import asyncio
1010
from collections import Counter
11+
from http import HTTPStatus
12+
from pathlib import Path
1113

12-
import aiohttp
14+
import httpx
1315
import tqdm # type: ignore
1416

15-
from flags2_common import main, HTTPStatus, Result, save_flag
17+
from flags2_common import main, DownloadStatus, save_flag
1618

1719
# default set low to avoid errors from remote site, such as
1820
# 503 - Service Temporarily Unavailable
1921
DEFAULT_CONCUR_REQ = 5
2022
MAX_CONCUR_REQ = 1000
2123

2224

23-
class FetchError(Exception):
24-
def __init__(self, country_code: str):
25-
self.country_code = country_code
26-
27-
28-
async def get_flag(session: aiohttp.ClientSession,
25+
async def get_flag(session: httpx.AsyncClient, # <2>
2926
base_url: str,
3027
cc: str) -> bytes:
3128
url = f'{base_url}/{cc}/{cc}.gif'.lower()
32-
async with session.get(url) as resp:
33-
if resp.status == 200:
34-
return await resp.read()
35-
else:
36-
resp.raise_for_status()
37-
return bytes()
29+
resp = await session.get(url, timeout=3.1, follow_redirects=True) # <3>
30+
resp.raise_for_status()
31+
return resp.content
3832

39-
# tag::FLAGS2_ASYNCIO_EXECUTOR[]
40-
async def download_one(session: aiohttp.ClientSession,
33+
34+
async def download_one(session: httpx.AsyncClient,
4135
cc: str,
4236
base_url: str,
4337
semaphore: asyncio.Semaphore,
44-
verbose: bool) -> Result:
38+
verbose: bool) -> DownloadStatus:
4539
try:
4640
async with semaphore:
4741
image = await get_flag(session, base_url, cc)
48-
except aiohttp.ClientResponseError as exc:
49-
if exc.status == 404:
50-
status = HTTPStatus.not_found
51-
msg = 'not found'
42+
except httpx.HTTPStatusError as exc:
43+
res = exc.response
44+
if res.status_code == HTTPStatus.NOT_FOUND:
45+
status = DownloadStatus.NOT_FOUND
46+
msg = f'not found: {res.url}'
5247
else:
53-
raise FetchError(cc) from exc
48+
raise
5449
else:
55-
loop = asyncio.get_running_loop() # <1>
56-
loop.run_in_executor(None, # <2>
57-
save_flag, image, f'{cc}.gif') # <3>
58-
status = HTTPStatus.ok
50+
# tag::FLAGS2_ASYNCIO_EXECUTOR[]
51+
loop = asyncio.get_running_loop() # <1>
52+
loop.run_in_executor(None, save_flag, # <2>
53+
image, f'{cc}.gif') # <3>
54+
# end::FLAGS2_ASYNCIO_EXECUTOR[]
55+
status = DownloadStatus.OK
5956
msg = 'OK'
6057
if verbose and msg:
6158
print(cc, msg)
62-
return Result(status, cc)
63-
# end::FLAGS2_ASYNCIO_EXECUTOR[]
59+
return status
6460

6561
async def supervisor(cc_list: list[str],
6662
base_url: str,
6763
verbose: bool,
68-
concur_req: int) -> Counter[HTTPStatus]:
69-
counter: Counter[HTTPStatus] = Counter()
70-
semaphore = asyncio.Semaphore(concur_req)
71-
async with aiohttp.ClientSession() as session:
64+
concur_req: int) -> Counter[DownloadStatus]: # <1>
65+
counter: Counter[DownloadStatus] = Counter()
66+
semaphore = asyncio.Semaphore(concur_req) # <2>
67+
async with httpx.AsyncClient() as session:
7268
to_do = [download_one(session, cc, base_url, semaphore, verbose)
73-
for cc in sorted(cc_list)]
74-
75-
to_do_iter = asyncio.as_completed(to_do)
69+
for cc in sorted(cc_list)] # <3>
70+
to_do_iter = asyncio.as_completed(to_do) # <4>
7671
if not verbose:
77-
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))
78-
for coro in to_do_iter:
72+
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) # <5>
73+
for coro in to_do_iter: # <6>
7974
try:
80-
res = await coro
81-
except FetchError as exc:
82-
country_code = exc.country_code
83-
try:
84-
error_msg = exc.__cause__.message # type: ignore
85-
except AttributeError:
86-
error_msg = 'Unknown cause'
87-
if verbose and error_msg:
88-
print(f'*** Error for {country_code}: {error_msg}')
89-
status = HTTPStatus.error
90-
else:
91-
status = res.status
92-
93-
counter[status] += 1
94-
95-
return counter
96-
75+
status = await coro # <7>
76+
except httpx.HTTPStatusError as exc: # <13>
77+
error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
78+
error_msg = error_msg.format(resp=exc.response)
79+
error = exc
80+
except httpx.RequestError as exc: # <15>
81+
error_msg = f'{exc} {type(exc)}'.strip()
82+
error = exc
83+
except KeyboardInterrupt: # <7>
84+
break
85+
else: # <8>
86+
error = None
87+
88+
if error:
89+
status = DownloadStatus.ERROR # <9>
90+
if verbose: # <11>
91+
cc = Path(str(error.request.url)).stem.upper()
92+
print(f'{cc} error: {error_msg}')
93+
counter[status] += 1 # <10>
94+
95+
return counter # <12>
9796

9897
def download_many(cc_list: list[str],
9998
base_url: str,
10099
verbose: bool,
101-
concur_req: int) -> Counter[HTTPStatus]:
100+
concur_req: int) -> Counter[DownloadStatus]:
102101
coro = supervisor(cc_list, base_url, verbose, concur_req)
103102
counts = asyncio.run(coro) # <14>
104103

105104
return counts
106105

107-
108106
if __name__ == '__main__':
109107
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
108+
# end::FLAGS2_ASYNCIO_START[]

0 commit comments

Comments
 (0)