forked from infobyte/faraday
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmanager.py
132 lines (110 loc) · 4.21 KB
/
manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
Faraday Penetration Test IDE
Copyright (C) 2013 Infobyte LLC (http://www.infobytesec.com/)
See the file 'doc/LICENSE' for the license information
'''
import imp
import os
import re
import sys
import traceback
from plugins.controller import PluginController
from config.configuration import getInstanceConfiguration
from utils.logs import getLogger
CONF = getInstanceConfiguration()
class PluginManager(object):
def __init__(self, plugin_repo_path):
self._controllers = {}
self._plugin_modules = {}
self._loadPlugins(plugin_repo_path)
self._plugin_settings = {}
self._loadSettings()
def addController(self, controller, id):
self._controllers[id] = controller
def _loadSettings(self):
_plugin_settings = CONF.getPluginSettings()
if _plugin_settings:
self._plugin_settings = _plugin_settings
activep = self._instancePlugins()
for plugin_id, plugin in activep.iteritems():
if plugin_id in _plugin_settings:
plugin.updateSettings(_plugin_settings[plugin_id]["settings"])
self._plugin_settings[plugin_id] = {
"name": plugin.name,
"description": plugin.description,
"version": plugin.version,
"plugin_version": plugin.plugin_version,
"settings": dict(plugin.getSettings())
}
dplugins = []
for k, v in self._plugin_settings.iteritems():
if k not in activep:
dplugins.append(k)
for d in dplugins:
del self._plugin_settings[d]
CONF.setPluginSettings(self._plugin_settings)
CONF.saveConfig()
def getSettings(self):
return self._plugin_settings
def updateSettings(self, settings):
self._plugin_settings = settings
CONF.setPluginSettings(settings)
CONF.saveConfig()
for plugin_id, params in settings.iteritems():
new_settings = params["settings"]
for c_id, c_instance in self._controllers.iteritems():
c_instance.updatePluginSettings(plugin_id, new_settings)
def _instancePlugins(self):
plugins = {}
for module in self._plugin_modules.itervalues():
new_plugin = module.createPlugin()
self._verifyPlugin(new_plugin)
plugins[new_plugin.id] = new_plugin
return plugins
def _loadPlugins(self, plugin_repo_path):
"""
Finds and load all the plugins that are
available in the plugin_repo_path.
"""
try:
os.stat(plugin_repo_path)
except OSError:
pass
sys.path.append(plugin_repo_path)
dir_name_regexp = re.compile(r"^[\d\w\-\_]+$")
for name in os.listdir(plugin_repo_path):
if dir_name_regexp.match(name):
try:
module_path = os.path.join(plugin_repo_path, name)
sys.path.append(module_path)
module_filename = os.path.join(module_path, "plugin.py")
self._plugin_modules[name] = imp.load_source(
name, module_filename)
except Exception as e:
msg = "An error ocurred while loading plugin %s.\n%s" % (
module_filename, traceback.format_exc())
getLogger(self).debug(msg)
getLogger(self).warn(e)
else:
pass
def getPlugins(self):
plugins = self._instancePlugins()
for id, plugin in plugins.items():
if id in self._plugin_settings:
plugin.updateSettings(self._plugin_settings[id]["settings"])
return plugins
def _verifyPlugin(self, new_plugin):
"""
Generic method that decides is a plugin is valid
based on a predefined set of checks.
"""
try:
assert(new_plugin.id is not None)
assert(new_plugin.version is not None)
assert(new_plugin.name is not None)
assert(new_plugin.framework_version is not None)
except (AssertionError, KeyError):
return False
return True