forked from tflowers/bwlogtools
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathXSLog.py
143 lines (119 loc) · 4.52 KB
/
XSLog.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#!/usr/bin/python
import re
from datetime import datetime
from itertools import groupby
class XSLogEntry(object):
_siplogfmt = re.compile(r'^(?:udp|tcp)(?:\ )'
+'(?:[0-9]+\ Bytes\ )'
+'(?P<direction>IN|OUT)(?:\ )'
+'(?:to|from)(?:\ )'
+'(?:(?P<ipaddr>.*)(?::)(?P<port>.*)\r\n)'
+'(?P<sipmsg>(?:.*\r\n)+)', re.M)
_timestampfmt = re.compile(r'(?P<year>[0-9]{4})(?:\.)'
+'(?P<month>[0-9]{2})(?:\.)'
+'(?P<day>[0-9]{2})(?:\ )'
+'(?P<hour>[0-9]{2})(?::)'
+'(?P<min>[0-9]{2})(?::)'
+'(?P<sec>[0-9]{2})\:'
+'(?P<msec>[0-9]{3})(?:\ )'
+'(?P<tz>[A-Z]{3})$')
def __init__(self, datetime=None, loglevel=None, logtype=None, body=None):
self.datetime = self.convert_timestamp(datetime)
self.loglevel = loglevel
self.logtype = logtype
self.body = body
def __repr__(self):
line = ''
for x in range(80):
line += '-'
line += '\n'
repr_str = '\n' + line + str(self.datetime)
repr_str += " " + self.loglevel + " " + self.logtype #+ ":" + "\n"
#repr_str += line + "\n" + self.body
repr_str += " " + self.direction + " " + self.ipaddr + ":" + self.port
repr_str += "\nFrom:" + self.headers['From']
repr_str += "\nTo:" + self.headers['To']
repr_str += "\nVia:" + self.headers['Via']
repr_str += "\nDiversion:" + (self.headers['Diversion'] if 'Diversion' in self.headers else 'none')
return repr_str
def type(self):
return self.__class__.__name__
def convert_timestamp(self, timestr):
match = self._timestampfmt.match(timestr)
if not match:
return False
ts = match.groupdict()
#convert timestamp entries to int HACK
for key in ts:
if key != 'tz':
ts[key] = int(ts[key])
return datetime(ts['year'], ts['month'], ts['day'],
ts['hour'], ts['min'], ts['sec'],
ts['msec'] * 1000 )
def parse_headers(self, sipmsg):
headers = dict()
for line in sipmsg.split("\n"):
l = line.split(":", 1)
if len(l) > 1 and l[0].find('=') == -1:
headers[l[0]] = l[1]
return headers
@staticmethod
def factory(rawlog):
logline, body = rawlog
entries = [entry.strip() for entry in logline.split('|')]
datetime, loglevel, logtype = entries[:3]
match = XSLogEntry._siplogfmt.match(body)
if match:
return SipXSLogEntry(datetime, loglevel, logtype, body, match.groupdict())
else:
return GenericXSLogEntry(datetime, loglevel, logtype, body)
class SipXSLogEntry(XSLogEntry):
def __init__(self, datetime=None, loglevel=None,
logtype=None, body=None, siplog=None):
super(SipXSLogEntry, self).__init__(datetime, loglevel, logtype, body)
self.sipmsg = siplog['sipmsg'] + "\r\n"
self.direction = siplog['direction']
self.ipaddr = siplog['ipaddr']
self.port = siplog['port']
self.headers = self.parse_headers(siplog['sipmsg'])
class GenericXSLogEntry(XSLogEntry):
def __init__(self, datetime=None, loglevel=None, logtype=None, body=None):
super(GenericXSLogEntry, self).__init__(datetime, loglevel, logtype, body)
class XSLog(object):
"""
XSLog Parses Broadworks XSLog files into a list of logs
"""
_logstart = re.compile(r'^[0-9]{4}\.[0-9]{2}\.[0-9]{2}')
def __init__(self, fn):
self.logs = self.parser(fn)
def __iter__(self):
for log in self.logs:
yield log
def __getitem__(self, key):
return self.logs[key]
def siplogs( self, filterfunc=None ):
siplogs = [log for log in self.logs if log.type() == 'SipXSLogEntry']
if filterfunc != None:
return filter( filterfunc, siplogs )
else:
return siplogs
def parser(self, fn):
groups = []
keys = []
try:
f = open(fn, 'r')
tmp = f.next()
while not self._logstart.match(tmp):
tmp = f.next()
keys.append(tmp)
for key, group in groupby(f, self._logstart.match):
if key: keys.append(list(group))
else: groups.append(list(group))
finally:
f.close()
#This assumes that the parser gets a group entry for each key,
#this may be error prone, but so far seems to work.
keys = ["".join(k).strip() for k in keys]
groups = ["".join(g).strip() for g in groups]
rawlogs = zip(keys, groups)
return [XSLogEntry.factory(rl) for rl in rawlogs]