Skip to content

Commit

Permalink
add statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
clowwindy committed Aug 5, 2015
1 parent e08845d commit 9c3af61
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 14 deletions.
33 changes: 28 additions & 5 deletions shadowsocks/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
from shadowsocks import common, eventloop, tcprelay, udprelay, asyncdns, shell


BUF_SIZE = 2048
BUF_SIZE = 1506
STAT_SEND_LIMIT = 100


class Manager(object):
Expand All @@ -44,6 +45,7 @@ def __init__(self, config):
self._statistics = collections.defaultdict(int)
self._control_client_addr = None
try:
# TODO use address instead of port
self._control_socket.bind(('127.0.0.1',
int(config['manager_port'])))
self._control_socket.setblocking(False)
Expand All @@ -53,6 +55,7 @@ def __init__(self, config):
exit(1)
self._loop.add(self._control_socket,
eventloop.POLL_IN, self)
self._loop.add_periodic(self.handle_periodic)

port_password = config['port_password']
del config['port_password']
Expand All @@ -70,8 +73,10 @@ def add_port(self, config):
port))
return
logging.info("adding server at %s:%d" % (config['server'], port))
t = tcprelay.TCPRelay(config, self._dns_resolver, False)
u = udprelay.UDPRelay(config, self._dns_resolver, False)
t = tcprelay.TCPRelay(config, self._dns_resolver, False,
self.stat_callback)
u = udprelay.UDPRelay(config, self._dns_resolver, False,
self.stat_callback)
t.add_to_loop(self._loop)
u.add_to_loop(self._loop)
self._relays[port] = (t, u)
Expand Down Expand Up @@ -126,9 +131,27 @@ def _parse_command(self, data):
logging.error(e)
return None

def stat_callback(self, port, data_len):
self._statistics[port] += data_len

def handle_periodic(self):
# TODO send statistics
pass
r = {}
i = 0

def send_data(data_dict):
if data_dict:
data = common.to_bytes(json.dumps(data_dict,
separators=(',', ':')))
self._send_control_data(b'stat: ' + data)

for k, v in self._statistics.items():
r[k] = v
i += 1
if i >= STAT_SEND_LIMIT:
send_data(r)
r.clear()
send_data(r)
self._statistics.clear()

def _send_control_data(self, data):
if self._control_client_addr:
Expand Down
18 changes: 11 additions & 7 deletions shadowsocks/tcprelay.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,10 @@ def _get_a_server(self):
logging.debug('chosen server: %s:%d', server, server_port)
return server, server_port

def _update_activity(self):
def _update_activity(self, data_len=0):
# tell the TCP Relay we have activities recently
# else it will think we are inactive and timed out
self._server.update_activity(self)
self._server.update_activity(self, data_len)

def _update_stream(self, stream, status):
# update a stream to a new waiting status
Expand Down Expand Up @@ -317,7 +317,6 @@ def _handle_stage_addr(self, data):
self._log_error(e)
if self._config['verbose']:
traceback.print_exc()
# TODO use logging when debug completed
self.destroy()

def _create_remote_socket(self, ip, port):
Expand Down Expand Up @@ -388,7 +387,6 @@ def _handle_dns_resolved(self, result, error):
def _on_local_read(self):
# handle all local read events and dispatch them to methods for
# each stage
self._update_activity()
if not self._local_sock:
return
is_local = self._is_local
Expand All @@ -402,6 +400,7 @@ def _on_local_read(self):
if not data:
self.destroy()
return
self._update_activity(len(data))
if not is_local:
data = self._encryptor.decrypt(data)
if not data:
Expand All @@ -424,17 +423,18 @@ def _on_local_read(self):

def _on_remote_read(self):
# handle all remote read events
self._update_activity()
data = None
try:
data = self._remote_sock.recv(BUF_SIZE)

except (OSError, IOError) as e:
if eventloop.errno_from_exception(e) in \
(errno.ETIMEDOUT, errno.EAGAIN, errno.EWOULDBLOCK):
return
if not data:
self.destroy()
return
self._update_activity(len(data))
if self._is_local:
data = self._encryptor.decrypt(data)
else:
Expand Down Expand Up @@ -549,7 +549,7 @@ def destroy(self):


class TCPRelay(object):
def __init__(self, config, dns_resolver, is_local):
def __init__(self, config, dns_resolver, is_local, stat_callback=None):
self._config = config
self._is_local = is_local
self._dns_resolver = dns_resolver
Expand Down Expand Up @@ -589,6 +589,7 @@ def __init__(self, config, dns_resolver, is_local):
self._config['fast_open'] = False
server_socket.listen(1024)
self._server_socket = server_socket
self._stat_callback = stat_callback

def add_to_loop(self, loop):
if self._eventloop:
Expand All @@ -607,7 +608,10 @@ def remove_handler(self, handler):
self._timeouts[index] = None
del self._handler_to_timeouts[hash(handler)]

def update_activity(self, handler):
def update_activity(self, handler, data_len):
if data_len and self._stat_callback:
self._stat_callback(self._listen_port, data_len)

# set handler to active
now = int(time.time())
if now - handler.last_activity < eventloop.TIMEOUT_PRECISION:
Expand Down
8 changes: 6 additions & 2 deletions shadowsocks/udprelay.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def client_key(source_addr, server_af):


class UDPRelay(object):
def __init__(self, config, dns_resolver, is_local):
def __init__(self, config, dns_resolver, is_local, stat_callback=None):
self._config = config
if is_local:
self._listen_addr = config['local_address']
Expand Down Expand Up @@ -121,6 +121,7 @@ def __init__(self, config, dns_resolver, is_local):
server_socket.bind((self._listen_addr, self._listen_port))
server_socket.setblocking(False)
self._server_socket = server_socket
self._stat_callback = stat_callback

def _get_a_server(self):
server = self._config['server']
Expand All @@ -146,6 +147,8 @@ def _handle_server(self):
data, r_addr = server.recvfrom(BUF_SIZE)
if not data:
logging.debug('UDP handle_server: data is empty')
if self._stat_callback:
self._stat_callback(self._listen_port, len(data))
if self._is_local:
frag = common.ord(data[2])
if frag != 0:
Expand Down Expand Up @@ -181,7 +184,6 @@ def _handle_server(self):

af, socktype, proto, canonname, sa = addrs[0]
key = client_key(r_addr, af)
logging.debug(key)
client = self._cache.get(key, None)
if not client:
# TODO async getaddrinfo
Expand Down Expand Up @@ -221,6 +223,8 @@ def _handle_client(self, sock):
if not data:
logging.debug('UDP handle_client: data is empty')
return
if self._stat_callback:
self._stat_callback(self._listen_port, len(data))
if not self._is_local:
addrlen = len(r_addr[0])
if addrlen > 255:
Expand Down

0 comments on commit 9c3af61

Please sign in to comment.