Skip to content

Commit 31e2c4f

Browse files
committed
flags2 await refactoring start
1 parent 22cfc8d commit 31e2c4f

File tree

1 file changed

+118
-0
lines changed

1 file changed

+118
-0
lines changed

17-futures/countries/flags2_await.py

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
"""Download flags of countries (with error handling).
2+
3+
asyncio version
4+
5+
Sample run::
6+
7+
$ python3 flags2_asyncio.py -s ERROR -e -m 200
8+
ERROR site: http://localhost:8003/flags
9+
Searching for 676 flags: from AA to ZZ
10+
200 concurrent connections will be used.
11+
--------------------
12+
146 flags downloaded.
13+
363 not found.
14+
167 errors.
15+
Elapsed time: 2.59s
16+
17+
"""
18+
# BEGIN FLAGS2_ASYNCIO_TOP
19+
import asyncio
20+
import collections
21+
22+
import aiohttp
23+
from aiohttp import web
24+
import tqdm
25+
26+
from flags2_common import main, HTTPStatus, Result, save_flag
27+
28+
# default set low to avoid errors from remote site, such as
29+
# 503 - Service Temporarily Unavailable
30+
DEFAULT_CONCUR_REQ = 5
31+
MAX_CONCUR_REQ = 1000
32+
33+
34+
class FetchError(Exception): # <1>
35+
def __init__(self, country_code):
36+
self.country_code = country_code
37+
38+
39+
@asyncio.coroutine
40+
def get_flag(base_url, cc): # <2>
41+
url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
42+
resp = yield from aiohttp.request('GET', url)
43+
if resp.status == 200:
44+
image = yield from resp.read()
45+
return image
46+
elif resp.status == 404:
47+
raise web.HTTPNotFound()
48+
else:
49+
raise aiohttp.HttpProcessingError(
50+
code=resp.status, message=resp.reason,
51+
headers=resp.headers)
52+
53+
54+
@asyncio.coroutine
55+
def download_one(cc, base_url, semaphore, verbose): # <3>
56+
try:
57+
with (yield from semaphore): # <4>
58+
image = yield from get_flag(base_url, cc) # <5>
59+
except web.HTTPNotFound: # <6>
60+
status = HTTPStatus.not_found
61+
msg = 'not found'
62+
except Exception as exc:
63+
raise FetchError(cc) from exc # <7>
64+
else:
65+
save_flag(image, cc.lower() + '.gif') # <8>
66+
status = HTTPStatus.ok
67+
msg = 'OK'
68+
69+
if verbose and msg:
70+
print(cc, msg)
71+
72+
return Result(status, cc)
73+
# END FLAGS2_ASYNCIO_TOP
74+
75+
# BEGIN FLAGS2_ASYNCIO_DOWNLOAD_MANY
76+
@asyncio.coroutine
77+
def downloader_coro(cc_list, base_url, verbose, concur_req): # <1>
78+
counter = collections.Counter()
79+
semaphore = asyncio.Semaphore(concur_req) # <2>
80+
to_do = [download_one(cc, base_url, semaphore, verbose)
81+
for cc in sorted(cc_list)] # <3>
82+
83+
to_do_iter = asyncio.as_completed(to_do) # <4>
84+
if not verbose:
85+
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) # <5>
86+
for future in to_do_iter: # <6>
87+
try:
88+
res = yield from future # <7>
89+
except FetchError as exc: # <8>
90+
country_code = exc.country_code # <9>
91+
try:
92+
error_msg = exc.__cause__.args[0] # <10>
93+
except IndexError:
94+
error_msg = exc.__cause__.__class__.__name__ # <11>
95+
if verbose and error_msg:
96+
msg = '*** Error for {}: {}'
97+
print(msg.format(country_code, error_msg))
98+
status = HTTPStatus.error
99+
else:
100+
status = res.status
101+
102+
counter[status] += 1 # <12>
103+
104+
return counter # <13>
105+
106+
107+
def download_many(cc_list, base_url, verbose, concur_req):
108+
loop = asyncio.get_event_loop()
109+
coro = downloader_coro(cc_list, base_url, verbose, concur_req)
110+
counts = loop.run_until_complete(coro) # <14>
111+
loop.close() # <15>
112+
113+
return counts
114+
115+
116+
if __name__ == '__main__':
117+
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
118+
# END FLAGS2_ASYNCIO_DOWNLOAD_MANY

0 commit comments

Comments
 (0)