-
-
Notifications
You must be signed in to change notification settings - Fork 143
/
Copy pathpluginsloader.py
162 lines (126 loc) · 5.48 KB
/
pluginsloader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#!/usr/bin/python3
import os
import sys
import inspect
from io import StringIO
import csv
from lib.proxylogger import ProxyLogger
#
# Plugin that attempts to load all of the supplied plugins from
# program launch options.
class PluginsLoader:
class InjectedLogger(ProxyLogger):
def __init__(self, name, options = None):
self.name = name
super().__init__(options)
def _text(self, txt):
return '[{}] {}'.format(self.name, txt)
# Info shall be used as an ordinary logging facility, for every desired output.
def info(self, txt, forced = False, **kwargs):
super().info(self._text(txt), forced, **kwargs)
# Trace by default does not uses [TRACE] prefix. Shall be used
# for dumping packets, headers, metadata and longer technical output.
def trace(self, txt, **kwargs):
super().trace(self._text(txt), **kwargs)
def dbg(self, txt, **kwargs):
super().dbg(self._text(txt), **kwargs)
def err(self, txt, **kwargs):
super().err(self._text(txt), **kwargs)
def fatal(self, txt, **kwargs):
super().fatal(self._text(txt), **kwargs)
def __init__(self, logger, options, instantiate = True):
self.options = options
self.plugins = {}
self.called = False
self.logger = logger
self.instantiate = instantiate
plugins_count = len(self.options['plugins'])
if plugins_count > 0:
self.logger.info('Loading %d plugin%s...' % (plugins_count, '' if plugins_count == 1 else 's'))
for plugin in self.options['plugins']:
self.load(plugin)
self.called = True
# Output format:
# plugins = {'plugin1': instance, 'plugin2': instance, ...}
def get_plugins(self):
return self.plugins
#
# Following function parses input plugin path with parameters and decomposes
# them to extract plugin's arguments along with it's path.
# For instance, having such string:
# -p "plugins/my_plugin.py",argument1="test",argument2,argument3=test2
#
# It will return:
# {'path':'plugins/my_plugin.py', 'argument1':'t,e,s,t', 'argument2':'', 'argument3':'test2'}
#
@staticmethod
def decompose_path(p):
decomposed = {}
f = StringIO(p)
rows = list(csv.reader(f, quoting=csv.QUOTE_ALL, skipinitialspace=True))
for i in range(len(rows[0])):
row = rows[0][i]
if i == 0:
decomposed['path'] = row
continue
if '=' in row:
s = row.split('=')
decomposed[s[0]] = s[1].replace('"', '')
else:
decomposed[row] = ''
return decomposed
def load(self, path):
instance = None
self.logger.dbg('Plugin string: "%s"' % path)
decomposed = PluginsLoader.decompose_path(path)
self.logger.dbg('Decomposed as: %s' % str(decomposed))
plugin = decomposed['path'].strip()
if not os.path.isfile(plugin):
_plugin = os.path.normpath(os.path.join(os.path.dirname(__file__), '../plugins/{}'.format(plugin)))
if os.path.isfile(_plugin):
plugin = _plugin
elif os.path.isfile(_plugin+'.py'):
plugin = _plugin + '.py'
name = os.path.basename(plugin).lower().replace('.py', '')
if name in self.plugins or name in ['iproxyplugin', '__init__']:
# Plugin already loaded.
return
self.logger.dbg('Attempting to load plugin: %s ("%s")...' % (name, plugin))
try:
sys.path.append(os.path.dirname(plugin))
__import__(name)
module = sys.modules[name]
self.logger.dbg('Module imported.')
try:
handler = getattr(module, self.options['plugin_class_name'])
found = False
for base in inspect.getmro(handler):
if base.__name__ == 'IProxyPlugin':
found = True
break
if not found:
raise TypeError('Plugin does not inherit from IProxyPlugin.')
# Call plugin's __init__ with the `logger' instance passed to it.
if self.instantiate:
instance = handler(PluginsLoader.InjectedLogger(name), self.options)
else:
instance = handler
self.logger.dbg('Found class "%s".' % self.options['plugin_class_name'])
except AttributeError as e:
self.logger.err('Plugin "%s" loading has failed: "%s".' %
(name, self.options['plugin_class_name']))
self.logger.err('\tError: %s' % e)
if self.options['debug']:
raise
except TypeError as e:
self.logger.err('Plugin "{}" instantiation failed due to interface incompatibility.'.format(name))
raise
if not instance:
self.logger.err('Didn\'t find supported class in module "%s"' % name)
else:
self.plugins[name] = instance
self.logger.info('Plugin "%s" has been installed.' % name)
except ImportError as e:
self.logger.err('Couldn\'t load specified plugin: "%s". Error: %s' % (plugin, e))
if self.options['debug']:
raise