forked from QAtestinc/python-slack-sdk
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathasync_internal_utils.py
201 lines (173 loc) · 5.94 KB
/
async_internal_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import asyncio
import json
from asyncio import AbstractEventLoop
from logging import Logger
from ssl import SSLContext
from typing import Union, Optional, BinaryIO, List, Dict
from urllib.parse import urljoin
import aiohttp
from aiohttp import FormData, BasicAuth, ClientSession
from slack.errors import SlackRequestError, SlackApiError
from slack.web import get_user_agent
def _get_event_loop() -> AbstractEventLoop:
"""Retrieves the event loop or creates a new one."""
try:
return asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
def _get_url(base_url: str, api_method: str) -> str:
"""Joins the base Slack URL and an API method to form an absolute URL.
Args:
base_url (str): The base URL
api_method (str): The Slack Web API method. e.g. 'chat.postMessage'
Returns:
The absolute API URL.
e.g. 'https://www.slack.com/api/chat.postMessage'
"""
return urljoin(base_url, api_method)
def _get_headers(
*,
headers: dict,
token: Optional[str],
has_json: bool,
has_files: bool,
request_specific_headers: Optional[dict],
) -> Dict[str, str]:
"""Constructs the headers need for a request.
Args:
has_json (bool): Whether or not the request has json.
has_files (bool): Whether or not the request has files.
request_specific_headers (dict): Additional headers specified by the user for a specific request.
Returns:
The headers dictionary.
e.g. {
'Content-Type': 'application/json;charset=utf-8',
'Authorization': 'Bearer xoxb-1234-1243',
'User-Agent': 'Python/3.6.8 slack/2.1.0 Darwin/17.7.0'
}
"""
final_headers = {
"User-Agent": get_user_agent(),
"Content-Type": "application/x-www-form-urlencoded",
}
if token:
final_headers.update({"Authorization": "Bearer {}".format(token)})
if headers is None:
headers = {}
# Merge headers specified at client initialization.
final_headers.update(headers)
# Merge headers specified for a specific request. e.g. oauth.access
if request_specific_headers:
final_headers.update(request_specific_headers)
if has_json:
final_headers.update({"Content-Type": "application/json;charset=utf-8"})
if has_files:
# These are set automatically by the aiohttp library.
final_headers.pop("Content-Type", None)
return final_headers
def _build_req_args(
*,
token: Optional[str],
http_verb: str,
files: dict,
data: Union[dict, FormData],
params: dict,
json: dict, # skipcq: PYL-W0621
headers: dict,
auth: dict,
ssl: Optional[SSLContext],
proxy: Optional[str],
) -> dict:
has_json = json is not None
has_files = files is not None
if has_json and http_verb != "POST":
msg = "Json data can only be submitted as POST requests. GET requests should use the 'params' argument."
raise SlackRequestError(msg)
if auth:
auth = BasicAuth(auth["client_id"], auth["client_secret"])
if data is not None and isinstance(data, dict):
data = {k: v for k, v in data.items() if v is not None}
if files is not None and isinstance(files, dict):
files = {k: v for k, v in files.items() if v is not None}
if params is not None and isinstance(params, dict):
params = {k: v for k, v in params.items() if v is not None}
token: Optional[str] = token
if params is not None and "token" in params:
token = params.pop("token")
if json is not None and "token" in json:
token = json.pop("token")
req_args = {
"headers": _get_headers(
headers=headers,
token=token,
has_json=has_json,
has_files=has_files,
request_specific_headers=headers,
),
"data": data,
"files": files,
"params": params,
"json": json,
"ssl": ssl,
"proxy": proxy,
"auth": auth,
}
return req_args
def _files_to_data(req_args: dict) -> List[BinaryIO]:
open_files = []
files = req_args.pop("files", None)
if files is not None:
for k, v in files.items():
if isinstance(v, str):
f = open(v.encode("utf-8", "ignore"), "rb")
open_files.append(f)
req_args["data"].update({k: f})
else:
req_args["data"].update({k: v})
return open_files
async def _request_with_session(
*,
current_session: Optional[ClientSession],
timeout: int,
logger: Logger,
http_verb: str,
api_url: str,
req_args: dict,
) -> Dict[str, any]:
"""Submit the HTTP request with the running session or a new session.
Returns:
A dictionary of the response data.
"""
session = None
use_running_session = current_session and not current_session.closed
if use_running_session:
session = current_session
else:
session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=timeout),
auth=req_args.pop("auth", None),
)
response = None
try:
async with session.request(http_verb, api_url, **req_args) as res:
data = {}
try:
data = await res.json()
except aiohttp.ContentTypeError:
logger.debug(
f"No response data returned from the following API call: {api_url}."
)
except json.decoder.JSONDecodeError as e:
message = f"Failed to parse the response body: {str(e)}"
raise SlackApiError(message, res)
response = {
"data": data,
"headers": res.headers,
"status_code": res.status,
}
finally:
if not use_running_session:
await session.close()
return response