-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathchains.py
78 lines (61 loc) · 2.88 KB
/
chains.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import re
from collections import namedtuple
from pyptables.base import DebugObject
class AbstractChain(DebugObject, list):
"""Represents an iptables Chain. Holds a number of Rule objects in a list-like fashion"""
Result = namedtuple('ChainResult', 'header_content rules')
def __init__(self, name, comment=None, rules=()):
super(AbstractChain, self).__init__(rules)
self.comment = comment
self.name = name
def to_iptables(self):
"""Returns this chain in a format compatible with iptables-restore"""
try:
prefix = '-A %s' % (self.name,)
if self:
rule_output = [rule.to_iptables(prefix=prefix) for rule in self]
rule_output = "\n".join(rule_output)
else:
rule_output = '# No rules'
return AbstractChain.Result(header_content=self._chain_definition(),
rules="%(comment)s\n%(rules)s" % {
'comment': self._comment(),
'rules': rule_output,
},
)
except Exception as e: # pragma: no cover
e.iptables_path = getattr(e, 'iptables_path', [])
e.iptables_path.insert(0, self.name)
raise
def _chain_definition(self):
"""Return iptables-restore formatted instruction to create
the chain (note: rules are added separately)
"""
raise NotImplemented('Subclasses must define this method') # pragma: no cover
def _comment(self):
comment = '# %(type)s "%(name)s" (%(debug)s)"' % {
'type': self._type_name(),
'name': self.name,
'debug': self.debug_info(),
}
if self.comment:
comment = "%s\n# %s" % (comment, self.comment)
return comment
def _type_name(self):
return " ".join(re.findall(r'[A-Z][^A-Z]*', self.__class__.__name__))
def __repr__(self):
truncated = [str(i) for i in self[:3]] + (['...'] if len(self) > 3 else [])
return "<%s: %s - [%s]>" % (self.__class__.__name__, self.name, ", ".join(truncated))
class UserChain(AbstractChain):
def __init__(self, *args, **kwargs):
super(UserChain, self).__init__(*args, **kwargs)
def _chain_definition(self):
return ':%(name)s - [0:0]' % {'name': self.name}
class BuiltinChain(AbstractChain):
"""Represents a built-in iptables chain
Built-in chains can have a default policy"""
def __init__(self, name, policy, *args, **kwargs):
super(BuiltinChain, self).__init__(name, *args, **kwargs)
self.policy = policy
def _chain_definition(self):
return ':%(name)s %(policy)s [0:0]' % {'name': self.name, 'policy': self.policy}