forked from O365/python-o365
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtoken_backends.py
177 lines (152 loc) · 7.55 KB
/
token_backends.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import time
import logging
import random
from portalocker import Lock
from portalocker.exceptions import LockException
from O365.utils import FirestoreBackend, FileSystemTokenBackend
log = logging.getLogger(__name__)
# This is an implementation of the 'should_refresh_token' method
class LockableFirestoreBackend(FirestoreBackend):
"""
A firestore backend that can answer to
'should_refresh_token'. Synchronous.
"""
def __init__(self, *args, **kwargs):
self.refresh_flag_field_name = kwargs.get('refresh_flag_field_name')
if self.refresh_flag_field_name is None:
raise ValueError('Must provide the db field name of the refresh token flag')
self.max_tries = kwargs.pop('max_tries', 5) # max db calls
self.factor = kwargs.pop('factor', 1.5) # incremental back off factor
super().__init__(*args, **kwargs)
def _take_refresh_action(self):
# this should transactionally get the flag and set it to False if it's True
# it should return True if it has set the flag to False.
# if the flag was already False then return False
resolution = True # example...
return resolution
def _check_refresh_flag(self):
""" Returns the token if the flag is True or None otherwise"""
try:
doc = self.doc_ref.get()
except Exception as e:
log.error('Flag (collection: {}, doc_id: {}) '
'could not be retrieved from the backend: {}'
.format(self.collection, self.doc_id, str(e)))
doc = None
if doc and doc.exists:
if doc.get(self.refresh_flag_field_name):
token_str = doc.get(self.field_name)
if token_str:
token = self.token_constructor(self.serializer.loads(token_str))
return token
return None
def should_refresh_token(self, con=None):
# 1) check if the token is already a new one:
new_token = self.load_token()
if new_token and new_token.get('access_token') != self.token.get('access_token'):
# The token is different. Store it and return False
self.token = new_token
return False
# 2) ask if you can take the action of refreshing the access token
if self._take_refresh_action():
# we have updated the flag and an now begin to refresh the token
return True
# 3) we must wait until the refresh is done by another instance
tries = 0
while True:
tries += 1
value = self.factor * 2 ** (tries - 1)
seconds = random.uniform(0, value)
time.sleep(seconds) # we sleep first as _take_refresh_action already checked the flag
# 4) Check for the flag. if returns a token then is the new token.
token = self._check_refresh_flag()
if token is not None:
# store the token and leave
self.token = token
break
if tries == self.max_tries:
# we tried and didn't get a result.
return True
return False
def save_token(self):
"""We must overwrite this method to update also the flag to True"""
if self.token is None:
raise ValueError('You have to set the "token" first.')
try:
# set token will overwrite previous data
self.doc_ref.set({
self.field_name: self.serializer.dumps(self.token),
self.refresh_flag_field_name: True
})
except Exception as e:
log.error('Token could not be saved: {}'.format(str(e)))
return False
return True
class LockableFileSystemTokenBackend(FileSystemTokenBackend):
"""
GH #350
A token backend that ensures atomic operations when working with tokens
stored on a file system. Avoids concurrent instances of O365 racing
to refresh the same token file. It does this by wrapping the token refresh
method in the Portalocker package's Lock class, which itself is a wrapper
around Python's fcntl and win32con.
"""
def __init__(self, *args, **kwargs):
self.max_tries = kwargs.pop('max_tries')
self.fs_wait = False
super().__init__(*args, **kwargs)
def should_refresh_token(self, con=None):
"""
Method for refreshing the token when there are concurrently running
O365 instances. Determines if we need to call the MS server and refresh
the token and its file, or if another Connection instance has already
updated it and we should just load that updated token from the file.
It will always return False, None, OR raise an error if a token file
couldn't be accessed after X tries. That is because this method
completely handles token refreshing via the passed Connection object
argument. If it determines that the token should be refreshed, it locks
the token file, calls the Connection's 'refresh_token' method (which
loads the fresh token from the server into memory and the file), then
unlocks the file. Since refreshing has been taken care of, the calling
method does not need to refresh and we return None.
If we are blocked because the file is locked, that means another
instance is using it. We'll change the backend's state to waiting,
sleep for 2 seconds, reload a token into memory from the file (since
another process is using it, we can assume it's being updated), and
loop again.
If this newly loaded token is not expired, the other instance loaded
a new token to file, and we can happily move on and return False.
(since we don't need to refresh the token anymore). If the same token
was loaded into memory again and is still expired, that means it wasn't
updated by the other instance yet. Try accessing the file again for X
more times. If we don't suceed after the loop has terminated, raise a
runtime exception
"""
for _ in range(self.max_tries, 0, -1):
if self.token.is_access_expired:
try:
with Lock(self.token_path, 'r+',
fail_when_locked=True, timeout=0):
log.debug('Locked oauth token file')
if con.refresh_token() is False:
raise RuntimeError('Token Refresh Operation not '
'working')
log.info('New oauth token fetched')
log.debug('Unlocked oauth token file')
return None
except LockException:
self.fs_wait = True
log.warning('Oauth file locked. Sleeping for 2 seconds...'
f'retrying {_ - 1} more times.')
time.sleep(2)
log.debug('Waking up and rechecking token file for update'
' from other instance...')
self.token = self.load_token()
else:
log.info('Token was refreshed by another instance...')
self.fs_wait = False
return False
# if we exit the loop, that means we were locked out of the file after
# multiple retries give up and throw an error - something isn't right
raise RuntimeError('Could not access locked token file after '
f'{self.max_tries}')