Skip to content

Commit

Permalink
Refactored to match updated coding standard.
Browse files Browse the repository at this point in the history
  • Loading branch information
grigoryvp committed Jun 14, 2013
1 parent 84c4f90 commit 164c658
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 119 deletions.
30 changes: 15 additions & 15 deletions parabridge/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,53 +32,53 @@
HELP_TASK_LIST = """Displays list of added tasks."""


def start( _ ) :
def start( _ ):
sFile = os.path.join( os.path.dirname( __file__ ), "parabridge_daemon.py" )
subprocess.Popen( [ 'python', sFile, str( info.COMM_PORT ) ] )


def stop( _ ) :
try :
def stop( _ ):
try:
oSrv = xmlrpclib.ServerProxy( info.COMM_ADDR )
oSrv.stop()
except socket.error :
except socket.error:
pass


def status( _ ) :
try :
def status( _ ):
try:
oSrv = xmlrpclib.ServerProxy( info.COMM_ADDR )
print( oSrv.status() )
except socket.error :
except socket.error:
print( "Daemon is not running." )


def task_add( m_args ) :
def task_add( m_args ):
sName = m_args[ 'task_name' ]
sSrc = m_args[ 'task_src' ]
sDst = m_args[ 'task_dst' ]
if not settings.instance.taskAdd( sName, sSrc, sDst ) :
if not settings.instance.taskAdd( sName, sSrc, sDst ):
logging.warning( "Already has '{0}' task".format( sName ) )


def task_del( m_args ) :
if not settings.instance.taskDelByName( m_args[ 'task_name' ] ) :
def task_del( m_args ):
if not settings.instance.taskDelByName( m_args[ 'task_name' ] ):
logging.warning( "No task named '{0}'".format( m_args[ 'task_name' ] ) )


def task_list( _ ) :
def task_list( _ ):
lTasks = settings.instance.taskList()
if 0 == len( lTasks ) :
if 0 == len( lTasks ):
print( "Tasks list is empty." )
return
for mTask in lTasks :
for mTask in lTasks:
print( "{0}\n Source: {1}\n Destination: {2}".format(
mTask[ 'name' ],
mTask[ 'src' ],
mTask[ 'dst' ] ) )


def main() :
def main():
settings.instance.init( f_notify = True )
oParser = argparse.ArgumentParser( description = HELP_APP )
oSubparsers = oParser.add_subparsers()
Expand Down
4 changes: 2 additions & 2 deletions parabridge/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
NAME_SHORT = "parabridge"
VER_MAJOR = 0
VER_MINOR = 1
try :
try:
VER_TXT = pkg_resources.require( NAME_SHORT )[ 0 ].version
## Installing via 'setup.py develop'?
except pkg_resources.DistributionNotFound :
except pkg_resources.DistributionNotFound:
VER_BUILD = 0
VER_TXT = ".".join( map( str, [ VER_MAJOR, VER_MINOR, VER_BUILD ] ) )
DIR_THIS = os.path.dirname( os.path.abspath( __file__ ) )
Expand Down
112 changes: 56 additions & 56 deletions parabridge/parabridge_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import settings


class Worker( threading.Thread ) :
class Worker( threading.Thread ):

_instance_o = None


def __init__( self ) :
def __init__( self ):
super( Worker, self ).__init__()
self._shutdown_f = False
self._shutdown_o = threading.Event()
Expand All @@ -34,49 +34,49 @@ def __init__( self ) :
self._timeReloadLast_o = None


def run( self ) :
while not self._shutdown_f :
def run( self ):
while not self._shutdown_f:
lTasks = []
if self._cfgCHanged_f :
if self._cfgCHanged_f:
lTasks = settings.instance.taskList()
self._cfgCHanged_f = False
self._timeReloadLast_o = time.localtime()
for mTask in lTasks :
for mTask in lTasks:
sSrc = os.path.expanduser( mTask[ 'src' ] )
sDst = os.path.expanduser( mTask[ 'dst' ] )
self.processTask( mTask[ 'guid' ], mTask[ 'name' ], sSrc, sDst )
## Sleep some time so we don't overuse HDD and CPU.
time.sleep( 1 )


def processTask( self, s_guid, s_name, s_src, s_dst ) :
def processTask( self, s_guid, s_name, s_src, s_dst ):

def setRes( i_sTxt ) :
def setRes( i_sTxt ):
self._results_m[ s_name ] = i_sTxt
return False

if not os.path.exists( s_src ) :
if not os.path.exists( s_src ):
return setRes( "Path \"{0}\" not found.".format( s_src ) )
if not os.path.isdir( s_src ) :
if not os.path.isdir( s_src ):
return setRes( "Path \"{0}\" is not a directory.".format( s_src ) )
try :
try:
os.makedirs( os.path.dirname( s_dst ) )
except OSError :
except OSError:
pass

lSrcFiles = [ s_src + os.sep + s for s in os.listdir( s_src ) ]
lSrcFiles = [ s for s in lSrcFiles if os.path.isfile( s ) ]
lSrcFiles = [ s for s in lSrcFiles if re.search( "(?i)\.db$", s ) ]
if 0 == len( lSrcFiles ) :
if 0 == len( lSrcFiles ):
return setRes( "No .db files in \"{0}\".".format( s_src ) )
lProcessed = []
nTotal = len( lSrcFiles )
with sqlite3.connect( s_dst ) as oConn :
for i, sSrcFile in enumerate( lSrcFiles ) :
with sqlite3.connect( s_dst ) as oConn:
for i, sSrcFile in enumerate( lSrcFiles ):
setRes( "Processing {0}/{1}".format( i + 1, nTotal ) )
if self.processParadoxFile( s_guid, sSrcFile, oConn ) :
if self.processParadoxFile( s_guid, sSrcFile, oConn ):
lProcessed.append( True )
if self._shutdown_f :
if self._shutdown_f:
return
## Sleep some time so we don't overuse HDD and CPU.
time.sleep( 1 )
Expand All @@ -87,62 +87,62 @@ def setRes( i_sTxt ) :

##x Process individual Paradox |.db| file and synchronize specified
## SQLite database file with it.
def processParadoxFile( self, s_guid, s_src, o_conn ) :
def processParadoxFile( self, s_guid, s_src, o_conn ):

try :
try:
sFile = os.path.basename( s_src )
nIndexLast = settings.instance.indexLastGet( s_guid, sFile )
mArgs = { 'shutdown' : self._shutdown_o }
mArgs = { 'shutdown': self._shutdown_o }
## First time parse of this file?
if nIndexLast is None :
if nIndexLast is None:
oDb = pyparadox.open( s_src, ** mArgs )
else :
else:
mArgs[ 'start' ] = nIndexLast + 1
oDb = pyparadox.open( s_src, ** mArgs )
## We can handle only tables that has autoincrement field (if
## such field exists, it will be first for Paradox database. We
## need it to detect updates).
if len( oDb.fields ) < 1 or not oDb.fields[ 0 ].IsAutoincrement() :
if len( oDb.fields ) < 1 or not oDb.fields[ 0 ].IsAutoincrement():
return False
## Table empty or not updated since saved last index.
if 0 == len( oDb.records ) :
if 0 == len( oDb.records ):
return True
for oRecord in oDb.records :
for oRecord in oDb.records:
nIndex = oRecord.fields[ 0 ]
if nIndexLast is not None and nIndexLast >= nIndex :
if nIndexLast is not None and nIndexLast >= nIndex:
raise Exception( "Consistency error." )
nIndexLast = nIndex
self.processParadoxRecord( oDb, oRecord, o_conn, sFile )
settings.instance.indexLastSet( s_guid, sFile, nIndexLast )
except pyparadox.Shutdown :
except pyparadox.Shutdown:
return False
return True


def processParadoxRecord( self, o_db, o_record, o_conn, s_file ) :
def FieldName( i_sParadoxName ) :
def processParadoxRecord( self, o_db, o_record, o_conn, s_file ):
def FieldName( i_sParadoxName ):
##! Paradox fields may be named like 'index' that is not a valid
## name for SQLite.
return 'f_{0}'.format( i_sParadoxName.lower() )
def FieldKey( i_sParadoxName ) :
def FieldKey( i_sParadoxName ):
return ':{0}'.format( FieldName( i_sParadoxName ) )
##! Table name as written in Paradox table file may not be unique among
## multiple files in single Paradox folder. Use file name as table name
## for SQLite.
mArgs = {
'name' : re.sub( '(?i)\.db$', '', s_file ).lower(),
'fields' : ", ".join( [ FieldName( o.name ) for o in o_db.fields ] ),
'values' : ", ".join( [ FieldKey( o.name ) for o in o_db.fields ] )
'name': re.sub( '(?i)\.db$', '', s_file ).lower(),
'fields': ", ".join( [ FieldName( o.name ) for o in o_db.fields ] ),
'values': ", ".join( [ FieldKey( o.name ) for o in o_db.fields ] )
}
lSignatures = []
for i, oField in enumerate( o_db.fields ) :
for i, oField in enumerate( o_db.fields ):
sName = FieldName( oField.name )
##! Paradox autoincrement field starts from 1, while for SQLite it
## starts from 0 and adding first item with 1 will raise an error.
## As workaround, use non-autoincrement field for SQLite.
if pyparadox.CField.AUTOINCREMENT == oField.type :
if pyparadox.CField.AUTOINCREMENT == oField.type:
sSignature = "{0} INTEGER".format( sName )
else :
else:
sSignature = "{0} {1}".format( sName, oField.toSqliteType() )
lSignatures.append( sSignature )
mArgs[ 'signature' ] = ", ".join( lSignatures )
Expand All @@ -152,47 +152,47 @@ def FieldKey( i_sParadoxName ) :
sQuery = "INSERT INTO {name} ({fields}) VALUES ({values})"
sQuery = sQuery.format( ** mArgs )
mArgs = {}
for i, oField in enumerate( o_db.fields ) :
for i, oField in enumerate( o_db.fields ):
uField = o_record.fields[ i ]
lUnsupported = [ datetime.time, datetime.date, datetime.datetime ]
if str == type( uField ) :
if str == type( uField ):
uField = uField.decode( 'cp1251' )
if type( uField ) in lUnsupported :
if type( uField ) in lUnsupported:
## SQLite don't have time types, use |ISO 8601| string.
uField = uField.isoformat()
mArgs[ FieldName( oField.name ) ] = uField
o_conn.execute( sQuery, mArgs )


def shutdown( self ) :
def shutdown( self ):
self._shutdown_f = True
##! After |_shutdown_f| is set to prevent races.
self._shutdown_o.set()


@classmethod
def instance( cls ) :
if not cls._instance_o :
def instance( cls ):
if not cls._instance_o:
cls._instance_o = Worker()
return cls._instance_o


def cfgChanged( self ) :
def cfgChanged( self ):
self._cfgCHanged_f = True


def results( self ) :
def results( self ):
return self._results_m


def timeReloadLast( self ) :
def timeReloadLast( self ):
return self._timeReloadLast_o


class Server( SimpleXMLRPCServer, object ) :
class Server( SimpleXMLRPCServer, object ):


def __init__( self, n_port ) :
def __init__( self, n_port ):
gAddr = ( 'localhost', n_port )
SimpleXMLRPCServer.__init__( self, gAddr, logRequests = False )
self._shutdown_f = False
Expand All @@ -201,28 +201,28 @@ def __init__( self, n_port ) :
self.register_function( self.cfgChanged )


def serve_forever( self, ** _ ) :
while not self._shutdown_f :
def serve_forever( self, ** _ ):
while not self._shutdown_f:
self.handle_request()


def stop( self ) :
def stop( self ):
self._shutdown_f = True
return True


def status( self ) :
def status( self ):
oTimeReloadLast = Worker.instance().timeReloadLast()
sMsg = """Daemon is running.
\tConfiguration reloaded: {0}""".format(
time.strftime( '%Y.%m.%d %H:%M:%S', oTimeReloadLast ) )
mResults = Worker.instance().results()
for sKey in sorted( mResults.keys() ) :
for sKey in sorted( mResults.keys() ):
sMsg += "\n{0}:\n\t {1}".format( sKey, mResults[ sKey ] )
return re.sub( '\t', ' ', re.sub( ' +', ' ', sMsg ) )


def cfgChanged( self ) :
def cfgChanged( self ):
Worker.instance().cfgChanged()
return True

Expand All @@ -233,11 +233,11 @@ def cfgChanged( self ) :
oArgs = oParser.parse_args()

Worker.instance().start()
try :
try:
Server( oArgs.port ).serve_forever()
except socket.error :
except socket.error:
## Unable to bind to port if already started.
pass
finally :
finally:
Worker.instance().shutdown()

Loading

0 comments on commit 164c658

Please sign in to comment.