Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
Major updates
  • Loading branch information
gullpong committed Apr 24, 2016
0 parents commit c9ed627
Show file tree
Hide file tree
Showing 27 changed files with 1,340 additions and 0 deletions.
59 changes: 59 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]

# C extensions
*.so

# Distribution / packaging
.Python
env/
env2/
env3/
build/
develop-eggs/
dist/
downloads/
eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.cache
nosetests.xml
coverage.xml

# Translations
*.mo
*.pot

# Django stuff:
*.log

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# test code
_test*.py
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
collabo
=======

Framework for Collaborative Multi-task Workers

![collabo](http://farm3.static.flickr.com/2023/2058506789_f3089de203.jpg)
38 changes: 38 additions & 0 deletions collabo/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
'''
Collabo - Framework for Multi-task Workers
Jinyong Lee ([email protected])
'''

# ==============================================================================
# Import
# ==============================================================================

from basic_model import BasicModel
from slave_model import SlaveModel
from patrol_model import PatrolModel


# ==============================================================================
# Initialization
# ==============================================================================

import signal
import sys


# NOTE: Handle SIGTERM/SIGBREAK so that it raises SystemExit exception instead
# abruptly aborting the process.
# In this way, we can guarantee all clean-up procedures are executed
# properly (e.g., releasing locks) especially when workers are
# force-stopped.
# This routine runs only once for each process at the moment
# collabo package is imported.
def _sigterm_handler(signum, frame):
raise SystemExit
if sys.platform != 'win32':
# Unix - catch SIGTERM
signal.signal(signal.SIGTERM, _sigterm_handler)
else:
# Windows - catch SIGBREAK
signal.signal(signal.SIGBREAK, _sigterm_handler)
185 changes: 185 additions & 0 deletions collabo/basic_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
'''
Basic Model - Workers Doing Tasks
'''

# ==============================================================================
# Definition
# ==============================================================================

# ------------------------------------------------------------------------------
# Formatting Functions
# ------------------------------------------------------------------------------

STATUS_CODE = {
'BORN': '*',
'DEAD': '-',
'RUNNING': 'R',
'BLOCKED': 'B',
'WAITING': 'W',
'SLEEPING': 'z',
}

def format_state(state, prev_string=''):
string = prev_string
if state['status'] in STATUS_CODE:
st_code = STATUS_CODE[state['status']]
else:
st_code = state['status'][0].upper()
if state['detail']:
st_desc = state['detail'][0].lower()
else:
st_desc = '.'
string += ' {}{}'.format(st_code, st_desc)
return string

def format_workers(workers, prev_string=''):
string = prev_string

# grouping workers with their group names
groups = {}
for worker in workers:
if worker.group not in groups:
groups[worker.group] = []
groups[worker.group].append(worker)

# format grouped worker's state and monitor information
for group, workers in groups.items():
if string:
string += ' '
string += '{}:'.format(group.title())
for worker in workers:
string = format_state(worker.state, string)
monitor = worker.get_info('monitor')
if monitor:
string += ' {{ {} }}'.format(monitor)

return string


# ------------------------------------------------------------------------------
# Basic Model
# ------------------------------------------------------------------------------

from worker import ThreadWorker
from worker import RootWorker
from table import DictTable
from demon import MonitorDemon


class BasicModel(object):
def __init__(
self, logger,
parent=None, suppress_exc=True,
worker_cls=None, table_cls=None):
self.name = 'Main'
self.logger = logger
self.parent = parent
self.suppress_exc = suppress_exc

self.worker_cls = worker_cls or ThreadWorker
self.table_cls = table_cls or DictTable

if not self.parent:
self.parent = RootWorker(self.table_cls(), self.logger)

self.workers = []
self.mon_demon = None

@property
def is_root(self):
return isinstance(self.parent, RootWorker)

def register_workers(self, group, task_func, num=1):
workers = []
for i in xrange(num):
# set label
if num > 1:
label = '{}_{:02}'.format(group.title(), i + 1)
else:
label = group.title()
# create worker instance
worker = self.worker_cls(
group, label,
self.table_cls(),
self.logger.getChild(label),
task_func)
workers.append(worker)
self.workers.append(worker)
return workers

def start_workers(self):
for worker in self.workers:
worker.start()

def wait_workers(self):
for worker in self.workers:
worker.join()

def stop_workers(self):
for worker in self.workers:
worker.kill()

def check_workers(self):
for worker in self.workers:
if worker.state['status'] == 'DEAD':
raise RuntimeError("worker terminated unexpectedly")

def format_monitor(self):
if self.is_root:
# NOTE: Format parent worker's state when it's root
# because it cannot be captured in the worker hierarchy.
string = '{}:'.format(self.name.title())
string = format_state(self.parent.state, string)
else:
string = ''
string = format_workers(self.workers, string)
return string

def enable_monitor(self, logger=None, interval=1.0, mute_threshold=None):
self.mon_demon = MonitorDemon(self, logger, interval, mute_threshold)

def sleep(self, duration, cause=''):
self.parent.sleep(duration, cause)

def run(self, task_func=None):
self.parent.submodel = self
# start monitor demon if available
if self.mon_demon:
self.mon_demon.start()
try:
with self.parent.toggle_state('RUNNING', 'model'):
self.logger.debug("Starting Workers")
self.start_workers()

# do task if given
if task_func:
task_func(self)

self.logger.debug("Waiting for Workers")
self.wait_workers()

except (SystemExit, KeyboardInterrupt):
if self.is_root:
# NOTE: Log system interrupts only when it is the root process
# because they are sent to all child processes.
self.logger.critical(
"Interruption signal received",
exc_info=1)
self.stop_workers()
if not self.suppress_exc:
raise

except Exception:
self.logger.critical(
"Unexpected exception occurred",
exc_info=1)
self.stop_workers()
if not self.suppress_exc:
raise

finally:
self.parent.submodel = None
# stop monitor demon if started
if self.mon_demon:
self.mon_demon.stop()

10 changes: 10 additions & 0 deletions collabo/channel/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
'''
Channel Module
'''

# ==============================================================================
# Import
# ==============================================================================

from channel import Channel
from queue_channel import QueueChannel
27 changes: 27 additions & 0 deletions collabo/channel/channel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
'''
Channel - Worker-shared Message Queue
'''

# ==============================================================================
# Definition
# ==============================================================================

class Channel(object):
def __init__(self, maxsize=None):
# NOTE: Infinite channel size if 'maxsize' is 'None'.
raise NotImplementedError

def size(self):
raise NotImplementedError

def clear(self):
raise NotImplementedError

def send(self, message, block, timeout):
# NOTE: Generally 'message' should be serializable.
raise NotImplementedError

def receive(self, block, timeout):
# NOTE: This method should return 'None' if no message is in the
# channel for the non-blocking mode.
raise NotImplementedError
Loading

0 comments on commit c9ed627

Please sign in to comment.