-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
1416 lines (1253 loc) · 57.4 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#-----------------------------------------------------------
# Threaded, Gevent and Prefork Servers
#-----------------------------------------------------------
import datetime
import errno
import logging
import os
import os.path
import platform
import random
import select
import signal
import socket
import subprocess
import sys
import threading
import time
import unittest
from itertools import chain
import psutil
import werkzeug.serving
from werkzeug.debug import DebuggedApplication
from ..tests import loader
if os.name == 'posix':
# Unix only for workers
import fcntl
import resource
try:
import inotify
from inotify.adapters import InotifyTrees
from inotify.constants import IN_MODIFY, IN_CREATE, IN_MOVED_TO
INOTIFY_LISTEN_EVENTS = IN_MODIFY | IN_CREATE | IN_MOVED_TO
except ImportError:
inotify = None
else:
# Windows shim
signal.SIGHUP = -1
inotify = None
if not inotify:
try:
import watchdog
from watchdog.observers import Observer
from watchdog.events import FileCreatedEvent, FileModifiedEvent, FileMovedEvent
except ImportError:
watchdog = None
# Optional process names for workers
try:
from setproctitle import setproctitle
except ImportError:
setproctitle = lambda x: None
import odoo
from odoo.modules import get_modules
from odoo.modules.registry import Registry
from odoo.release import nt_service_name
from odoo.tools import config
from odoo.tools import stripped_sys_argv, dumpstacks, log_ormcache_stats
_logger = logging.getLogger(__name__)
SLEEP_INTERVAL = 60 # 1 min
def memory_info(process):
"""
:return: the relevant memory usage according to the OS in bytes.
"""
# psutil < 2.0 does not have memory_info, >= 3.0 does not have get_memory_info
pmem = (getattr(process, 'memory_info', None) or process.get_memory_info)()
# MacOSX allocates very large vms to all processes so we only monitor the rss usage.
if platform.system() == 'Darwin':
return pmem.rss
return pmem.vms
def set_limit_memory_hard():
if platform.system() == 'Linux' and config['limit_memory_hard']:
rlimit = resource.RLIMIT_AS
soft, hard = resource.getrlimit(rlimit)
resource.setrlimit(rlimit, (config['limit_memory_hard'], hard))
def empty_pipe(fd):
try:
while os.read(fd, 1):
pass
except OSError as e:
if e.errno not in [errno.EAGAIN]:
raise
#----------------------------------------------------------
# Werkzeug WSGI servers patched
#----------------------------------------------------------
class LoggingBaseWSGIServerMixIn(object):
def handle_error(self, request, client_address):
t, e, _ = sys.exc_info()
if t == socket.error and e.errno == errno.EPIPE:
# broken pipe, ignore error
return
_logger.exception('Exception happened during processing of request from %s', client_address)
class BaseWSGIServerNoBind(LoggingBaseWSGIServerMixIn, werkzeug.serving.BaseWSGIServer):
""" werkzeug Base WSGI Server patched to skip socket binding. PreforkServer
use this class, sets the socket and calls the process_request() manually
"""
def __init__(self, app):
werkzeug.serving.BaseWSGIServer.__init__(self, "127.0.0.1", 0, app)
# Directly close the socket. It will be replaced by WorkerHTTP when processing requests
if self.socket:
self.socket.close()
def server_activate(self):
# dont listen as we use PreforkServer#socket
pass
class RequestHandler(werkzeug.serving.WSGIRequestHandler):
def setup(self):
# timeout to avoid chrome headless preconnect during tests
if config['test_enable'] or config['test_file']:
self.timeout = 5
# flag the current thread as handling a http request
super(RequestHandler, self).setup()
me = threading.current_thread()
me.name = 'odoo.service.http.request.%s' % (me.ident,)
def make_environ(self):
environ = super().make_environ()
# Add the TCP socket to environ in order for the websocket
# connections to use it.
environ['socket'] = self.connection
if self.headers.get('Upgrade') == 'websocket':
# Since the upgrade header is introduced in version 1.1, Firefox
# won't accept a websocket connection if the version is set to
# 1.0.
self.protocol_version = "HTTP/1.1"
return environ
def send_header(self, keyword, value):
# Prevent `WSGIRequestHandler` from sending the connection close header (compatibility with werkzeug >= 2.1.1 )
# since it is incompatible with websocket.
if self.headers.get('Upgrade') == 'websocket' and keyword == 'Connection' and value == 'close':
# Do not keep processing requests.
self.close_connection = True
return
super().send_header(keyword, value)
class ThreadedWSGIServerReloadable(LoggingBaseWSGIServerMixIn, werkzeug.serving.ThreadedWSGIServer):
""" werkzeug Threaded WSGI Server patched to allow reusing a listen socket
given by the environment, this is used by autoreload to keep the listen
socket open when a reload happens.
"""
def __init__(self, host, port, app):
# The ODOO_MAX_HTTP_THREADS environment variable allows to limit the amount of concurrent
# socket connections accepted by a threaded server, implicitly limiting the amount of
# concurrent threads running for http requests handling.
self.max_http_threads = os.environ.get("ODOO_MAX_HTTP_THREADS")
if self.max_http_threads:
try:
self.max_http_threads = int(self.max_http_threads)
except ValueError:
# If the value can't be parsed to an integer then it's computed in an automated way to
# half the size of db_maxconn because while most requests won't borrow cursors concurrently
# there are some exceptions where some controllers might allocate two or more cursors.
self.max_http_threads = config['db_maxconn'] // 2
self.http_threads_sem = threading.Semaphore(self.max_http_threads)
super(ThreadedWSGIServerReloadable, self).__init__(host, port, app,
handler=RequestHandler)
# See https://github.com/pallets/werkzeug/pull/770
# This allow the request threads to not be set as daemon
# so the server waits for them when shutting down gracefully.
self.daemon_threads = False
def server_bind(self):
SD_LISTEN_FDS_START = 3
if os.environ.get('LISTEN_FDS') == '1' and os.environ.get('LISTEN_PID') == str(os.getpid()):
self.reload_socket = True
self.socket = socket.fromfd(SD_LISTEN_FDS_START, socket.AF_INET, socket.SOCK_STREAM)
_logger.info('HTTP service (werkzeug) running through socket activation')
else:
self.reload_socket = False
super(ThreadedWSGIServerReloadable, self).server_bind()
_logger.info('HTTP service (werkzeug) running on %s:%s', self.server_name, self.server_port)
def server_activate(self):
if not self.reload_socket:
super(ThreadedWSGIServerReloadable, self).server_activate()
def process_request(self, request, client_address):
"""
Start a new thread to process the request.
Override the default method of class socketserver.ThreadingMixIn
to be able to get the thread object which is instantiated
and set its start time as an attribute
"""
t = threading.Thread(target = self.process_request_thread,
args = (request, client_address))
t.daemon = self.daemon_threads
t.type = 'http'
t.start_time = time.time()
t.start()
# TODO: Remove this method as soon as either of the revision
# - python/cpython@8b1f52b5a93403acd7d112cd1c1bc716b31a418a for Python 3.6,
# - python/cpython@908082451382b8b3ba09ebba638db660edbf5d8e for Python 3.7,
# is included in all Python 3 releases installed on all operating systems supported by Odoo.
# These revisions are included in Python from releases 3.6.8 and Python 3.7.2 respectively.
def _handle_request_noblock(self):
"""
In the python module `socketserver` `process_request` loop,
the __shutdown_request flag is not checked between select and accept.
Thus when we set it to `True` thanks to the call `httpd.shutdown`,
a last request is accepted before exiting the loop.
We override this function to add an additional check before the accept().
"""
if self._BaseServer__shutdown_request:
return
if self.max_http_threads and not self.http_threads_sem.acquire(timeout=0.1):
# If the semaphore is full we will return immediately to the upstream (most probably
# socketserver.BaseServer's serve_forever loop which will retry immediately as the
# selector will find a pending connection to accept on the socket. There is a 100 ms
# penalty in such case in order to avoid cpu bound loop while waiting for the semaphore.
return
# upstream _handle_request_noblock will handle errors and call shutdown_request in any cases
super(ThreadedWSGIServerReloadable, self)._handle_request_noblock()
def shutdown_request(self, request):
if self.max_http_threads:
# upstream is supposed to call this function no matter what happens during processing
self.http_threads_sem.release()
super().shutdown_request(request)
#----------------------------------------------------------
# FileSystem Watcher for autoreload and cache invalidation
#----------------------------------------------------------
class FSWatcherBase(object):
def handle_file(self, path):
if path.endswith('.py') and not os.path.basename(path).startswith('.~'):
try:
source = open(path, 'rb').read() + b'\n'
compile(source, path, 'exec')
except IOError:
_logger.error('autoreload: python code change detected, IOError for %s', path)
except SyntaxError:
_logger.error('autoreload: python code change detected, SyntaxError in %s', path)
else:
if not getattr(odoo, 'phoenix', False):
_logger.info('autoreload: python code updated, autoreload activated')
restart()
return True
class FSWatcherWatchdog(FSWatcherBase):
def __init__(self):
self.observer = Observer()
for path in odoo.addons.__path__:
_logger.info('Watching addons folder %s', path)
self.observer.schedule(self, path, recursive=True)
def dispatch(self, event):
if isinstance(event, (FileCreatedEvent, FileModifiedEvent, FileMovedEvent)):
if not event.is_directory:
path = getattr(event, 'dest_path', event.src_path)
self.handle_file(path)
def start(self):
self.observer.start()
_logger.info('AutoReload watcher running with watchdog')
def stop(self):
self.observer.stop()
self.observer.join()
class FSWatcherInotify(FSWatcherBase):
def __init__(self):
self.started = False
# ignore warnings from inotify in case we have duplicate addons paths.
inotify.adapters._LOGGER.setLevel(logging.ERROR)
# recreate a list as InotifyTrees' __init__ deletes the list's items
paths_to_watch = []
for path in odoo.addons.__path__:
paths_to_watch.append(path)
_logger.info('Watching addons folder %s', path)
self.watcher = InotifyTrees(paths_to_watch, mask=INOTIFY_LISTEN_EVENTS, block_duration_s=.5)
def run(self):
_logger.info('AutoReload watcher running with inotify')
dir_creation_events = set(('IN_MOVED_TO', 'IN_CREATE'))
while self.started:
for event in self.watcher.event_gen(timeout_s=0, yield_nones=False):
(_, type_names, path, filename) = event
if 'IN_ISDIR' not in type_names:
# despite not having IN_DELETE in the watcher's mask, the
# watcher sends these events when a directory is deleted.
if 'IN_DELETE' not in type_names:
full_path = os.path.join(path, filename)
if self.handle_file(full_path):
return
elif dir_creation_events.intersection(type_names):
full_path = os.path.join(path, filename)
for root, _, files in os.walk(full_path):
for file in files:
if self.handle_file(os.path.join(root, file)):
return
def start(self):
self.started = True
self.thread = threading.Thread(target=self.run, name="odoo.service.autoreload.watcher")
self.thread.daemon = True
self.thread.start()
def stop(self):
self.started = False
self.thread.join()
del self.watcher # ensures inotify watches are freed up before reexec
#----------------------------------------------------------
# Servers: Threaded, Gevented and Prefork
#----------------------------------------------------------
class CommonServer(object):
_on_stop_funcs = []
def __init__(self, app):
self.app = app
# config
self.interface = config['http_interface'] or '0.0.0.0'
self.port = config['http_port']
# runtime
self.pid = os.getpid()
def close_socket(self, sock):
""" Closes a socket instance cleanly
:param sock: the network socket to close
:type sock: socket.socket
"""
try:
sock.shutdown(socket.SHUT_RDWR)
except socket.error as e:
if e.errno == errno.EBADF:
# Werkzeug > 0.9.6 closes the socket itself (see commit
# https://github.com/mitsuhiko/werkzeug/commit/4d8ca089)
return
# On OSX, socket shutdowns both sides if any side closes it
# causing an error 57 'Socket is not connected' on shutdown
# of the other side (or something), see
# http://bugs.python.org/issue4397
# note: stdlib fixed test, not behavior
if e.errno != errno.ENOTCONN or platform.system() not in ['Darwin', 'Windows']:
raise
sock.close()
@classmethod
def on_stop(cls, func):
""" Register a cleanup function to be executed when the server stops """
cls._on_stop_funcs.append(func)
def stop(self):
for func in type(self)._on_stop_funcs:
try:
_logger.debug("on_close call %s", func)
func()
except Exception:
_logger.warning("Exception in %s", func.__name__, exc_info=True)
class ThreadedServer(CommonServer):
def __init__(self, app):
super(ThreadedServer, self).__init__(app)
self.main_thread_id = threading.current_thread().ident
# Variable keeping track of the number of calls to the signal handler defined
# below. This variable is monitored by ``quit_on_signals()``.
self.quit_signals_received = 0
#self.socket = None
self.httpd = None
self.limits_reached_threads = set()
self.limit_reached_time = None
def signal_handler(self, sig, frame):
if sig in [signal.SIGINT, signal.SIGTERM]:
# shutdown on kill -INT or -TERM
self.quit_signals_received += 1
if self.quit_signals_received > 1:
# logging.shutdown was already called at this point.
sys.stderr.write("Forced shutdown.\n")
os._exit(0)
# interrupt run() to start shutdown
raise KeyboardInterrupt()
elif hasattr(signal, 'SIGXCPU') and sig == signal.SIGXCPU:
sys.stderr.write("CPU time limit exceeded! Shutting down immediately\n")
sys.stderr.flush()
os._exit(0)
elif sig == signal.SIGHUP:
# restart on kill -HUP
odoo.phoenix = True
self.quit_signals_received += 1
# interrupt run() to start shutdown
raise KeyboardInterrupt()
def process_limit(self):
memory = memory_info(psutil.Process(os.getpid()))
if config['limit_memory_soft'] and memory > config['limit_memory_soft']:
_logger.warning('Server memory limit (%s) reached.', memory)
self.limits_reached_threads.add(threading.current_thread())
for thread in threading.enumerate():
thread_type = getattr(thread, 'type', None)
if not thread.daemon and thread_type != 'websocket' or thread_type == 'cron':
# We apply the limits on cron threads and HTTP requests,
# websocket requests excluded.
if getattr(thread, 'start_time', None):
thread_execution_time = time.time() - thread.start_time
thread_limit_time_real = config['limit_time_real']
if (getattr(thread, 'type', None) == 'cron' and
config['limit_time_real_cron'] and config['limit_time_real_cron'] > 0):
thread_limit_time_real = config['limit_time_real_cron']
if thread_limit_time_real and thread_execution_time > thread_limit_time_real:
_logger.warning(
'Thread %s virtual real time limit (%d/%ds) reached.',
thread, thread_execution_time, thread_limit_time_real)
self.limits_reached_threads.add(thread)
# Clean-up threads that are no longer alive
# e.g. threads that exceeded their real time,
# but which finished before the server could restart.
for thread in list(self.limits_reached_threads):
if not thread.is_alive():
self.limits_reached_threads.remove(thread)
if self.limits_reached_threads:
self.limit_reached_time = self.limit_reached_time or time.time()
else:
self.limit_reached_time = None
def cron_thread(self, number):
# Steve Reich timing style with thundering herd mitigation.
#
# On startup, all workers bind on a notification channel in
# postgres so they can be woken up at will. At worst they wake
# up every SLEEP_INTERVAL with a jitter. The jitter creates a
# chorus effect that helps distribute on the timeline the moment
# when individual worker wake up.
#
# On NOTIFY, all workers are awaken at the same time, sleeping
# just a bit prevents they all poll the database at the exact
# same time. This is known as the thundering herd effect.
from odoo.addons.base.models.ir_cron import ir_cron
conn = odoo.sql_db.db_connect('postgres')
with conn.cursor() as cr:
pg_conn = cr._cnx
# LISTEN / NOTIFY doesn't work in recovery mode
cr.execute("SELECT pg_is_in_recovery()")
in_recovery = cr.fetchone()[0]
if not in_recovery:
cr.execute("LISTEN cron_trigger")
else:
_logger.warning("PG cluster in recovery mode, cron trigger not activated")
cr.commit()
while True:
select.select([pg_conn], [], [], SLEEP_INTERVAL + number)
time.sleep(number / 100)
pg_conn.poll()
registries = odoo.modules.registry.Registry.registries
_logger.debug('cron%d polling for jobs', number)
for db_name, registry in registries.d.items():
if registry.ready:
thread = threading.current_thread()
thread.start_time = time.time()
try:
ir_cron._process_jobs(db_name)
except Exception:
_logger.warning('cron%d encountered an Exception:', number, exc_info=True)
thread.start_time = None
def cron_spawn(self):
""" Start the above runner function in a daemon thread.
The thread is a typical daemon thread: it will never quit and must be
terminated when the main process exits - with no consequence (the processing
threads it spawns are not marked daemon).
"""
# Force call to strptime just before starting the cron thread
# to prevent time.strptime AttributeError within the thread.
# See: http://bugs.python.org/issue7980
datetime.datetime.strptime('2012-01-01', '%Y-%m-%d')
for i in range(odoo.tools.config['max_cron_threads']):
def target():
self.cron_thread(i)
t = threading.Thread(target=target, name="odoo.service.cron.cron%d" % i)
t.daemon = True
t.type = 'cron'
t.start()
_logger.debug("cron%d started!" % i)
def http_thread(self):
self.httpd = ThreadedWSGIServerReloadable(self.interface, self.port, self.app)
self.httpd.serve_forever()
def http_spawn(self):
t = threading.Thread(target=self.http_thread, name="odoo.service.httpd")
t.daemon = True
t.start()
def start(self, stop=False):
_logger.debug("Setting signal handlers")
set_limit_memory_hard()
if os.name == 'posix':
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
signal.signal(signal.SIGCHLD, self.signal_handler)
signal.signal(signal.SIGHUP, self.signal_handler)
signal.signal(signal.SIGXCPU, self.signal_handler)
signal.signal(signal.SIGQUIT, dumpstacks)
signal.signal(signal.SIGUSR1, log_ormcache_stats)
elif os.name == 'nt':
import win32api
win32api.SetConsoleCtrlHandler(lambda sig: self.signal_handler(sig, None), 1)
test_mode = config['test_enable'] or config['test_file']
if test_mode or (config['http_enable'] and not stop):
# some tests need the http daemon to be available...
self.http_spawn()
def stop(self):
""" Shutdown the WSGI server. Wait for non daemon threads.
"""
if getattr(odoo, 'phoenix', None):
_logger.info("Initiating server reload")
else:
_logger.info("Initiating shutdown")
_logger.info("Hit CTRL-C again or send a second signal to force the shutdown.")
stop_time = time.time()
if self.httpd:
self.httpd.shutdown()
super().stop()
# Manually join() all threads before calling sys.exit() to allow a second signal
# to trigger _force_quit() in case some non-daemon threads won't exit cleanly.
# threading.Thread.join() should not mask signals (at least in python 2.5).
me = threading.current_thread()
_logger.debug('current thread: %r', me)
for thread in threading.enumerate():
_logger.debug('process %r (%r)', thread, thread.daemon)
if (thread != me and not thread.daemon and thread.ident != self.main_thread_id and
thread not in self.limits_reached_threads):
while thread.is_alive() and (time.time() - stop_time) < 1:
# We wait for requests to finish, up to 1 second.
_logger.debug('join and sleep')
# Need a busyloop here as thread.join() masks signals
# and would prevent the forced shutdown.
thread.join(0.05)
time.sleep(0.05)
odoo.sql_db.close_all()
_logger.debug('--')
logging.shutdown()
def run(self, preload=None, stop=False):
""" Start the http server and the cron thread then wait for a signal.
The first SIGINT or SIGTERM signal will initiate a graceful shutdown while
a second one if any will force an immediate exit.
"""
self.start(stop=stop)
rc = preload_registries(preload)
if stop:
if config['test_enable']:
logger = odoo.tests.result._logger
with Registry.registries._lock:
for db, registry in Registry.registries.d.items():
report = registry._assertion_report
log = logger.error if not report.wasSuccessful() \
else logger.warning if not report.testsRun \
else logger.info
log("%s when loading database %r", report, db)
self.stop()
return rc
self.cron_spawn()
# Wait for a first signal to be handled. (time.sleep will be interrupted
# by the signal handler)
try:
while self.quit_signals_received == 0:
self.process_limit()
if self.limit_reached_time:
has_other_valid_requests = any(
not t.daemon and
t not in self.limits_reached_threads
for t in threading.enumerate()
if getattr(t, 'type', None) == 'http')
if (not has_other_valid_requests or
(time.time() - self.limit_reached_time) > SLEEP_INTERVAL):
# We wait there is no processing requests
# other than the ones exceeding the limits, up to 1 min,
# before asking for a reload.
_logger.info('Dumping stacktrace of limit exceeding threads before reloading')
dumpstacks(thread_idents=[thread.ident for thread in self.limits_reached_threads])
self.reload()
# `reload` increments `self.quit_signals_received`
# and the loop will end after this iteration,
# therefore leading to the server stop.
# `reload` also sets the `phoenix` flag
# to tell the server to restart the server after shutting down.
else:
time.sleep(1)
else:
time.sleep(SLEEP_INTERVAL)
except KeyboardInterrupt:
pass
self.stop()
def reload(self):
os.kill(self.pid, signal.SIGHUP)
class GeventServer(CommonServer):
def __init__(self, app):
super(GeventServer, self).__init__(app)
self.port = config['gevent_port']
self.httpd = None
def process_limits(self):
restart = False
if self.ppid != os.getppid():
_logger.warning("Gevent Parent changed: %s", self.pid)
restart = True
memory = memory_info(psutil.Process(self.pid))
if config['limit_memory_soft'] and memory > config['limit_memory_soft']:
_logger.warning('Gevent virtual memory limit reached: %s', memory)
restart = True
if restart:
# suicide !!
os.kill(self.pid, signal.SIGTERM)
def watchdog(self, beat=4):
import gevent
self.ppid = os.getppid()
while True:
self.process_limits()
gevent.sleep(beat)
def start(self):
import gevent
try:
from gevent.pywsgi import WSGIServer, WSGIHandler
except ImportError:
from gevent.wsgi import WSGIServer, WSGIHandler
class ProxyHandler(WSGIHandler):
""" When logging requests, try to get the client address from
the environment so we get proxyfix's modifications (if any).
Derived from werzeug.serving.WSGIRequestHandler.log
/ werzeug.serving.WSGIRequestHandler.address_string
"""
def _connection_upgrade_requested(self):
if self.headers.get('Connection', '').lower() == 'upgrade':
return True
if self.headers.get('Upgrade', '').lower() == 'websocket':
return True
return False
def format_request(self):
old_address = self.client_address
if getattr(self, 'environ', None):
self.client_address = self.environ['REMOTE_ADDR']
elif not self.client_address:
self.client_address = '<local>'
# other cases are handled inside WSGIHandler
try:
return super().format_request()
finally:
self.client_address = old_address
def finalize_headers(self):
# We need to make gevent.pywsgi stop dealing with chunks when the connection
# Is being upgraded. see https://github.com/gevent/gevent/issues/1712
super().finalize_headers()
if self.code == 101:
# Switching Protocols. Disable chunked writes.
self.response_use_chunked = False
def get_environ(self):
# Add the TCP socket to environ in order for the websocket
# connections to use it.
environ = super().get_environ()
environ['socket'] = self.socket
# Disable support for HTTP chunking on reads which cause
# an issue when the connection is being upgraded, see
# https://github.com/gevent/gevent/issues/1712
if self._connection_upgrade_requested():
environ['wsgi.input'] = self.rfile
environ['wsgi.input_terminated'] = False
return environ
set_limit_memory_hard()
if os.name == 'posix':
# Set process memory limit as an extra safeguard
signal.signal(signal.SIGQUIT, dumpstacks)
signal.signal(signal.SIGUSR1, log_ormcache_stats)
gevent.spawn(self.watchdog)
self.httpd = WSGIServer(
(self.interface, self.port), self.app,
log=logging.getLogger('longpolling'),
error_log=logging.getLogger('longpolling'),
handler_class=ProxyHandler,
)
_logger.info('Evented Service (longpolling) running on %s:%s', self.interface, self.port)
try:
self.httpd.serve_forever()
except:
_logger.exception("Evented Service (longpolling): uncaught error during main loop")
raise
def stop(self):
import gevent
self.httpd.stop()
super().stop()
gevent.shutdown()
def run(self, preload, stop):
self.start()
self.stop()
class PreforkServer(CommonServer):
""" Multiprocessing inspired by (g)unicorn.
PreforkServer (aka Multicorn) currently uses accept(2) as dispatching
method between workers but we plan to replace it by a more intelligent
dispatcher to will parse the first HTTP request line.
"""
def __init__(self, app):
super().__init__(app)
# config
self.population = config['workers']
self.timeout = config['limit_time_real']
self.limit_request = config['limit_request']
self.cron_timeout = config['limit_time_real_cron'] or None
if self.cron_timeout == -1:
self.cron_timeout = self.timeout
# working vars
self.beat = 4
self.socket = None
self.workers_http = {}
self.workers_cron = {}
self.workers = {}
self.generation = 0
self.queue = []
self.long_polling_pid = None
def pipe_new(self):
pipe = os.pipe()
for fd in pipe:
# non_blocking
flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
fcntl.fcntl(fd, fcntl.F_SETFL, flags)
# close_on_exec
flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
fcntl.fcntl(fd, fcntl.F_SETFD, flags)
return pipe
def pipe_ping(self, pipe):
try:
os.write(pipe[1], b'.')
except IOError as e:
if e.errno not in [errno.EAGAIN, errno.EINTR]:
raise
def signal_handler(self, sig, frame):
if len(self.queue) < 5 or sig == signal.SIGCHLD:
self.queue.append(sig)
self.pipe_ping(self.pipe)
else:
_logger.warning("Dropping signal: %s", sig)
def worker_spawn(self, klass, workers_registry):
self.generation += 1
worker = klass(self)
pid = os.fork()
if pid != 0:
worker.pid = pid
self.workers[pid] = worker
workers_registry[pid] = worker
return worker
else:
worker.run()
sys.exit(0)
def long_polling_spawn(self):
nargs = stripped_sys_argv()
cmd = [sys.executable, sys.argv[0], 'gevent'] + nargs[1:]
popen = subprocess.Popen(cmd)
self.long_polling_pid = popen.pid
def worker_pop(self, pid):
if pid == self.long_polling_pid:
self.long_polling_pid = None
if pid in self.workers:
_logger.debug("Worker (%s) unregistered", pid)
try:
self.workers_http.pop(pid, None)
self.workers_cron.pop(pid, None)
u = self.workers.pop(pid)
u.close()
except OSError:
return
def worker_kill(self, pid, sig):
try:
os.kill(pid, sig)
except OSError as e:
if e.errno == errno.ESRCH:
self.worker_pop(pid)
def process_signals(self):
while len(self.queue):
sig = self.queue.pop(0)
if sig in [signal.SIGINT, signal.SIGTERM]:
raise KeyboardInterrupt
elif sig == signal.SIGHUP:
# restart on kill -HUP
odoo.phoenix = True
raise KeyboardInterrupt
elif sig == signal.SIGQUIT:
# dump stacks on kill -3
dumpstacks()
elif sig == signal.SIGUSR1:
# log ormcache stats on kill -SIGUSR1
log_ormcache_stats()
elif sig == signal.SIGTTIN:
# increase number of workers
self.population += 1
elif sig == signal.SIGTTOU:
# decrease number of workers
self.population -= 1
def process_zombie(self):
# reap dead workers
while 1:
try:
wpid, status = os.waitpid(-1, os.WNOHANG)
if not wpid:
break
if (status >> 8) == 3:
msg = "Critial worker error (%s)"
_logger.critical(msg, wpid)
raise Exception(msg % wpid)
self.worker_pop(wpid)
except OSError as e:
if e.errno == errno.ECHILD:
break
raise
def process_timeout(self):
now = time.time()
for (pid, worker) in self.workers.items():
if worker.watchdog_timeout is not None and \
(now - worker.watchdog_time) >= worker.watchdog_timeout:
_logger.error("%s (%s) timeout after %ss",
worker.__class__.__name__,
pid,
worker.watchdog_timeout)
self.worker_kill(pid, signal.SIGKILL)
def process_spawn(self):
if config['http_enable']:
while len(self.workers_http) < self.population:
self.worker_spawn(WorkerHTTP, self.workers_http)
if not self.long_polling_pid:
self.long_polling_spawn()
while len(self.workers_cron) < config['max_cron_threads']:
self.worker_spawn(WorkerCron, self.workers_cron)
def sleep(self):
try:
# map of fd -> worker
fds = {w.watchdog_pipe[0]: w for w in self.workers.values()}
fd_in = list(fds) + [self.pipe[0]]
# check for ping or internal wakeups
ready = select.select(fd_in, [], [], self.beat)
# update worker watchdogs
for fd in ready[0]:
if fd in fds:
fds[fd].watchdog_time = time.time()
empty_pipe(fd)
except select.error as e:
if e.args[0] not in [errno.EINTR]:
raise
def start(self):
# wakeup pipe, python doesn't throw EINTR when a syscall is interrupted
# by a signal simulating a pseudo SA_RESTART. We write to a pipe in the
# signal handler to overcome this behaviour
self.pipe = self.pipe_new()
# set signal handlers
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
signal.signal(signal.SIGHUP, self.signal_handler)
signal.signal(signal.SIGCHLD, self.signal_handler)
signal.signal(signal.SIGTTIN, self.signal_handler)
signal.signal(signal.SIGTTOU, self.signal_handler)
signal.signal(signal.SIGQUIT, dumpstacks)
signal.signal(signal.SIGUSR1, log_ormcache_stats)
if config['http_enable']:
# listen to socket
_logger.info('HTTP service (werkzeug) running on %s:%s', self.interface, self.port)
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.setblocking(0)
self.socket.bind((self.interface, self.port))
self.socket.listen(8 * self.population)
def stop(self, graceful=True):
if self.long_polling_pid is not None:
# FIXME make longpolling process handle SIGTERM correctly
self.worker_kill(self.long_polling_pid, signal.SIGKILL)
self.long_polling_pid = None
if self.socket:
self.socket.close()
if graceful:
_logger.info("Stopping gracefully")
super().stop()
limit = time.time() + self.timeout
for pid in self.workers:
self.worker_kill(pid, signal.SIGINT)
while self.workers and time.time() < limit:
try:
self.process_signals()
except KeyboardInterrupt:
_logger.info("Forced shutdown.")
break
self.process_zombie()
time.sleep(0.1)
else:
_logger.info("Stopping forcefully")
for pid in self.workers:
self.worker_kill(pid, signal.SIGTERM)
def run(self, preload, stop):
self.start()
rc = preload_registries(preload)
if stop:
self.stop()
return rc
# Empty the cursor pool, we dont want them to be shared among forked workers.
odoo.sql_db.close_all()
_logger.debug("Multiprocess starting")
while 1:
try:
#_logger.debug("Multiprocess beat (%s)",time.time())
self.process_signals()
self.process_zombie()
self.process_timeout()
self.process_spawn()
self.sleep()
except KeyboardInterrupt:
_logger.debug("Multiprocess clean stop")
self.stop()
break
except Exception as e:
_logger.exception(e)
self.stop(False)
return -1
class Worker(object):
""" Workers """
def __init__(self, multi):
self.multi = multi
self.watchdog_time = time.time()
self.watchdog_pipe = multi.pipe_new()
self.eintr_pipe = multi.pipe_new()
self.wakeup_fd_r, self.wakeup_fd_w = self.eintr_pipe
# Can be set to None if no watchdog is desired.
self.watchdog_timeout = multi.timeout
self.ppid = os.getpid()
self.pid = None
self.alive = True
# should we rename into lifetime ?
self.request_max = multi.limit_request