forked from jpetazzo/web2py
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnewcron.py
339 lines (299 loc) · 11.6 KB
/
newcron.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Created by Attila Csipa <[email protected]>
Modified by Massimo Di Pierro <[email protected]>
"""
import sys
import os
import threading
import logging
import time
import sched
import re
import datetime
import platform
import portalocker
import fileutils
import cPickle
from settings import global_settings
logger = logging.getLogger("web2py.cron")
_cron_stopping = False
def absolute_path_link(path):
"""
Return an absolute path for the destination of a symlink
"""
if os.path.islink(path):
link = os.readlink(path)
if not os.path.isabs(link):
link = os.path.join(os.path.dirname(path), link)
else:
link = os.path.abspath(path)
return link
def stopcron():
"graceful shutdown of cron"
global _cron_stopping
_cron_stopping = True
class extcron(threading.Thread):
def __init__(self, applications_parent):
threading.Thread.__init__(self)
self.setDaemon(False)
self.path = applications_parent
crondance(self.path, 'external', startup=True)
def run(self):
if not _cron_stopping:
logger.debug('external cron invocation')
crondance(self.path, 'external', startup=False)
class hardcron(threading.Thread):
def __init__(self, applications_parent):
threading.Thread.__init__(self)
self.setDaemon(True)
self.path = applications_parent
crondance(self.path, 'hard', startup=True)
def launch(self):
if not _cron_stopping:
logger.debug('hard cron invocation')
crondance(self.path, 'hard', startup = False)
def run(self):
s = sched.scheduler(time.time, time.sleep)
logger.info('Hard cron daemon started')
while not _cron_stopping:
now = time.time()
s.enter(60 - now % 60, 1, self.launch, ())
s.run()
class softcron(threading.Thread):
def __init__(self, applications_parent):
threading.Thread.__init__(self)
self.path = applications_parent
crondance(self.path, 'soft', startup=True)
def run(self):
if not _cron_stopping:
logger.debug('soft cron invocation')
crondance(self.path, 'soft', startup=False)
class Token(object):
def __init__(self,path):
self.path = os.path.join(path, 'cron.master')
if not os.path.exists(self.path):
fileutils.write_file(self.path, '', 'wb')
self.master = None
self.now = time.time()
def acquire(self,startup=False):
"""
returns the time when the lock is acquired or
None if cron already running
lock is implemented by writing a pickle (start, stop) in cron.master
start is time when cron job starts and stop is time when cron completed
stop == 0 if job started but did not yet complete
if a cron job started within less than 60 seconds, acquire returns None
if a cron job started before 60 seconds and did not stop,
a warning is issue "Stale cron.master detected"
"""
if portalocker.LOCK_EX is None:
logger.warning('WEB2PY CRON: Disabled because no file locking')
return None
self.master = open(self.path,'rb+')
try:
ret = None
portalocker.lock(self.master,portalocker.LOCK_EX)
try:
(start, stop) = cPickle.load(self.master)
except:
(start, stop) = (0, 1)
if startup or self.now - start > 59.99:
ret = self.now
if not stop:
# this happens if previous cron job longer than 1 minute
logger.warning('WEB2PY CRON: Stale cron.master detected')
logger.debug('WEB2PY CRON: Acquiring lock')
self.master.seek(0)
cPickle.dump((self.now,0),self.master)
finally:
portalocker.unlock(self.master)
if not ret:
# do this so no need to release
self.master.close()
return ret
def release(self):
"""
this function writes into cron.master the time when cron job
was completed
"""
if not self.master.closed:
portalocker.lock(self.master,portalocker.LOCK_EX)
logger.debug('WEB2PY CRON: Releasing cron lock')
self.master.seek(0)
(start, stop) = cPickle.load(self.master)
if start == self.now: # if this is my lock
self.master.seek(0)
cPickle.dump((self.now,time.time()),self.master)
portalocker.unlock(self.master)
self.master.close()
def rangetolist(s, period='min'):
retval = []
if s.startswith('*'):
if period == 'min':
s = s.replace('*', '0-59', 1)
elif period == 'hr':
s = s.replace('*', '0-23', 1)
elif period == 'dom':
s = s.replace('*', '1-31', 1)
elif period == 'mon':
s = s.replace('*', '1-12', 1)
elif period == 'dow':
s = s.replace('*', '0-6', 1)
m = re.compile(r'(\d+)-(\d+)/(\d+)')
match = m.match(s)
if match:
for i in range(int(match.group(1)), int(match.group(2)) + 1):
if i % int(match.group(3)) == 0:
retval.append(i)
return retval
def parsecronline(line):
task = {}
if line.startswith('@reboot'):
line=line.replace('@reboot', '-1 * * * *')
elif line.startswith('@yearly'):
line=line.replace('@yearly', '0 0 1 1 *')
elif line.startswith('@annually'):
line=line.replace('@annually', '0 0 1 1 *')
elif line.startswith('@monthly'):
line=line.replace('@monthly', '0 0 1 * *')
elif line.startswith('@weekly'):
line=line.replace('@weekly', '0 0 * * 0')
elif line.startswith('@daily'):
line=line.replace('@daily', '0 0 * * *')
elif line.startswith('@midnight'):
line=line.replace('@midnight', '0 0 * * *')
elif line.startswith('@hourly'):
line=line.replace('@hourly', '0 * * * *')
params = line.strip().split(None, 6)
if len(params) < 7:
return None
daysofweek={'sun':0,'mon':1,'tue':2,'wed':3,'thu':4,'fri':5,'sat':6}
for (s, id) in zip(params[:5], ['min', 'hr', 'dom', 'mon', 'dow']):
if not s in [None, '*']:
task[id] = []
vals = s.split(',')
for val in vals:
if val != '-1' and '-' in val and '/' not in val:
val = '%s/1' % val
if '/' in val:
task[id] += rangetolist(val, id)
elif val.isdigit() or val=='-1':
task[id].append(int(val))
elif id=='dow' and val[:3].lower() in daysofweek:
task[id].append(daysofweek(val[:3].lower()))
task['user'] = params[5]
task['cmd'] = params[6]
return task
class cronlauncher(threading.Thread):
def __init__(self, cmd, shell=True):
threading.Thread.__init__(self)
if platform.system() == 'Windows':
shell = False
elif isinstance(cmd,list):
cmd = ' '.join(cmd)
self.cmd = cmd
self.shell = shell
def run(self):
import subprocess
proc = subprocess.Popen(self.cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=self.shell)
(stdoutdata,stderrdata) = proc.communicate()
if proc.returncode != 0:
logger.warning(
'WEB2PY CRON Call returned code %s:\n%s' % \
(proc.returncode, stdoutdata+stderrdata))
else:
logger.debug('WEB2PY CRON Call returned success:\n%s' \
% stdoutdata)
def crondance(applications_parent, ctype='soft', startup=False):
apppath = os.path.join(applications_parent,'applications')
cron_path = os.path.join(applications_parent)
token = Token(cron_path)
cronmaster = token.acquire(startup=startup)
if not cronmaster:
return
now_s = time.localtime()
checks=(('min',now_s.tm_min),
('hr',now_s.tm_hour),
('mon',now_s.tm_mon),
('dom',now_s.tm_mday),
('dow',(now_s.tm_wday+1)%7))
apps = [x for x in os.listdir(apppath)
if os.path.isdir(os.path.join(apppath, x))]
full_apath_links = set()
for app in apps:
if _cron_stopping:
break;
apath = os.path.join(apppath,app)
# if app is a symbolic link to other app, skip it
full_apath_link = absolute_path_link(apath)
if full_apath_link in full_apath_links:
continue
else:
full_apath_links.add(full_apath_link)
cronpath = os.path.join(apath, 'cron')
crontab = os.path.join(cronpath, 'crontab')
if not os.path.exists(crontab):
continue
try:
cronlines = fileutils.readlines_file(crontab, 'rt')
lines = [x.strip() for x in cronlines if x.strip() and not x.strip().startswith('#')]
tasks = [parsecronline(cline) for cline in lines]
except Exception, e:
logger.error('WEB2PY CRON: crontab read error %s' % e)
continue
for task in tasks:
if _cron_stopping:
break;
commands = [sys.executable]
w2p_path = fileutils.abspath('web2py.py', gluon=True)
if os.path.exists(w2p_path):
commands.append(w2p_path)
if global_settings.applications_parent != global_settings.gluon_parent:
commands.extend(('-f', global_settings.applications_parent))
citems = [(k in task and not v in task[k]) for k,v in checks]
task_min= task.get('min',[])
if not task:
continue
elif not startup and task_min == [-1]:
continue
elif task_min != [-1] and reduce(lambda a,b: a or b, citems):
continue
logger.info('WEB2PY CRON (%s): %s executing %s in %s at %s' \
% (ctype, app, task.get('cmd'),
os.getcwd(), datetime.datetime.now()))
action, command, models = False, task['cmd'], ''
if command.startswith('**'):
(action,models,command) = (True,'',command[2:])
elif command.startswith('*'):
(action,models,command) = (True,'-M',command[1:])
else:
action=False
if action and command.endswith('.py'):
commands.extend(('-J', # cron job
models, # import models?
'-S', app, # app name
'-a', '"<recycle>"', # password
'-R', command)) # command
shell = True
elif action:
commands.extend(('-J', # cron job
models, # import models?
'-S', app+'/'+command, # app name
'-a', '"<recycle>"')) # password
shell = True
else:
commands = command
shell = False
try:
cronlauncher(commands, shell=shell).start()
except Exception, e:
logger.warning(
'WEB2PY CRON: Execution error for %s: %s' \
% (task.get('cmd'), e))
token.release()