diff --git a/logging.cfg.template b/doc/logging.cfg.template similarity index 100% rename from logging.cfg.template rename to doc/logging.cfg.template diff --git a/pox.py b/pox.py index a22d95e7b..8b66ce122 100755 --- a/pox.py +++ b/pox.py @@ -334,11 +334,15 @@ def setup_logging(log_config="logging.cfg", fail_if_non_existent=False): logging.config.fileConfig(log_config) else: if fail_if_non_existent: - raise IOError("Could not find logging config file: %s" % log_config) - - _default_log_handler = logging.StreamHandler() - _default_log_handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT)) - logging.getLogger().addHandler(_default_log_handler) + raise IOError("Could not find logging config file: %s" % (log_config,)) + + # This is kind of a hack, but we need to keep track of the handler we + # install so that we can, for example, uninstall it later. This code + # originally lived in pox.core, so we explicitly reference it here. + pox.core._default_log_handler = logging.StreamHandler() + formatter = logging.Formatter(logging.BASIC_FORMAT) + pox.core._default_log_handler.setFormatter(formatter) + logging.getLogger().addHandler(pox.core._default_log_handler) logging.getLogger().setLevel(logging.DEBUG) def main (): diff --git a/pox/lib/ioworker/io_worker.py b/pox/lib/ioworker/io_worker.py index f00085d64..1b308fe80 100644 --- a/pox/lib/ioworker/io_worker.py +++ b/pox/lib/ioworker/io_worker.py @@ -25,6 +25,7 @@ def __init__(self): self._on_data_receive = lambda worker: None def set_receive_handler(self, block): + """ Cause us to call the given block whenever data is ready to be read """ self._on_data_receive = block def send(self, data): @@ -32,28 +33,34 @@ def send(self, data): assert_type("data", data, [bytes], none_ok=False) self.send_buf += data - def push_receive_data(self, new_data): - """ notify client of new received data. called by a Select loop """ + def _push_receive_data(self, new_data): + # notify client of new received data. called by a Select loop self.receive_buf += new_data self._on_data_receive(self) def peek_receive_buf(self): + """ Grab the receive buffer. Don't modify it! """ return self.receive_buf def consume_receive_buf(self, l): - """ called from the client to consume receive buffer """ + """ Consume receive buffer """ + # called from the client assert(len(self.receive_buf) >= l) self.receive_buf = self.receive_buf[l:] @property - def ready_to_send(self): + def _ready_to_send(self): + # called by Select loop return len(self.send_buf) > 0 - def consume_send_buf(self, l): + def _consume_send_buf(self, l): + # Throw out the first l bytes of the send buffer + # Called by Select loop assert(len(self.send_buf)>=l) self.send_buf = self.send_buf[l:] def close(self): + """ Close this socket """ pass class RecocoIOWorker(IOWorker): @@ -62,20 +69,25 @@ def __init__(self, socket, pinger, on_close): IOWorker.__init__(self) self.socket = socket self.pinger = pinger + # (on_close factory method hides details of the Select loop) self.on_close = on_close def fileno(self): + """ Return the wrapped sockets' fileno """ return self.socket.fileno() def send(self, data): + """ send data from the client side. fire and forget. """ IOWorker.send(self, data) self.pinger.ping() def close(self): + """ Register this socket to be closed. fire and forget """ + # (don't close until Select loop is ready) IOWorker.close(self) # on_close is a function not a method self.on_close(self) - + class RecocoIOLoop(Task): """ recoco task that handles the actual IO for our IO workers @@ -85,27 +97,37 @@ class RecocoIOLoop(Task): def __init__ (self): Task.__init__(self) - self.workers = set() + self._workers = set() self.pinger = makePinger() - # socket.close() must be performed by this Select task -- otherwise - # we'll end up blocking on socket that doesn't exist. - self.pending_worker_closes = [] + # socket.open() and socket.close() are performed by this Select task + # other threads register open() and close() requests by adding lambdas + # to this thread-safe queue. + self._pending_commands = Queue.Queue() def create_worker_for_socket(self, socket): + ''' + Return an IOWorker wrapping the given socket. + ''' + # Called from external threads. + # Does not register the IOWorker immediately with the select loop -- + # rather, adds a command to the pending queue + + # Our callback for io_worker.close(): def on_close(worker): - ''' callback for io_worker.close() ''' - self.pending_worker_closes.append(worker) + def close_worker(worker): + # Actually close the worker (called by Select loop) + worker.socket.close() + self._workers.discard(worker) + # schedule close_worker to be called by Select loop + self._pending_commands.put(lambda: close_worker(worker)) self.pinger.ping() + worker = RecocoIOWorker(socket, pinger=self.pinger, on_close=on_close) - self.workers.add(worker) + # Don't add immediately, since we're in the wrong thread + self._pending_commands.put(lambda: self._workers.add(worker)) self.pinger.ping() return worker - - def _close_worker(self, worker): - ''' only called by our Select task ''' - worker.socket.close() - self.workers.discard(worker) - + def stop(self): self.running = False self.pinger.ping() @@ -114,15 +136,14 @@ def run (self): self.running = True while self.running: try: - # First, close and pending sockets - for io_worker in self.pending_worker_closes: - self._close_worker(io_worker) - self.pending_socket_closes = [] + # First, execute pending commands + while not self._pending_commands.empty(): + self._pending_commands.get()() - # Now grab remaining workers - read_sockets = list(self.workers) + [ self.pinger ] - write_sockets = [ worker for worker in self.workers if worker.ready_to_send ] - exception_sockets = list(self.workers) + # Now grab workers + read_sockets = list(self._workers) + [ self.pinger ] + write_sockets = [ worker for worker in self._workers if worker._ready_to_send ] + exception_sockets = list(self._workers) rlist, wlist, elist = yield Select(read_sockets, write_sockets, exception_sockets, self._select_timeout) @@ -133,28 +154,28 @@ def run (self): for worker in elist: worker.close() - if worker in self.workers: - self.workers.remove(worker) + if worker in self._workers: + self._workers.remove(worker) for worker in rlist: try: data = worker.socket.recv(self._BUF_SIZE) - worker.push_receive_data(data) + worker._push_receive_data(data) except socket.error as (s_errno, strerror): log.error("Socket error: " + strerror) worker.close() - self.workers.discard(worker) + self._workers.discard(worker) for worker in wlist: try: l = worker.socket.send(worker.send_buf) if l > 0: - worker.consume_send_buf(l) + worker._consume_send_buf(l) except socket.error as (s_errno, strerror): if s_errno != errno.EAGAIN: log.error("Socket error: " + strerror) worker.close() - self.workers.discard(worker) + self._workers.discard(worker) except exceptions.KeyboardInterrupt: break diff --git a/pox/samples/pretty_log.py b/pox/samples/pretty_log.py new file mode 100644 index 000000000..ef4796077 --- /dev/null +++ b/pox/samples/pretty_log.py @@ -0,0 +1,31 @@ +# Copyright 2012 James McCauley +# +# This file is part of POX. +# +# POX is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# POX is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with POX. If not, see . + +""" +This is a very simple component which provides some kind of nice +log formatting. + +It demonstrates launching another component (there should eventually +be a nice interface for doing this), and formatting color log messages. +""" + +def launch (): + import pox.log.color + pox.log.color.launch() + import pox.log + pox.log.launch(format="[@@@bold@@@level%(name)-22s@@@reset] " + + "@@@bold%(message)s@@@normal") diff --git a/setup.cfg b/tests/setup.cfg similarity index 100% rename from setup.cfg rename to tests/setup.cfg diff --git a/tests/unit/lib/ioworker/io_worker_test.py b/tests/unit/lib/ioworker/io_worker_test.py index 7e48d9b3e..61388464f 100644 --- a/tests/unit/lib/ioworker/io_worker_test.py +++ b/tests/unit/lib/ioworker/io_worker_test.py @@ -16,10 +16,10 @@ class IOWorkerTest(unittest.TestCase): def test_basic_send(self): i = IOWorker() i.send("foo") - self.assertTrue(i.ready_to_send) + self.assertTrue(i._ready_to_send) self.assertEqual(i.send_buf, "foo") - i.consume_send_buf(3) - self.assertFalse(i.ready_to_send) + i._consume_send_buf(3) + self.assertFalse(i._ready_to_send) def test_basic_receive(self): i = IOWorker() @@ -27,10 +27,10 @@ def test_basic_receive(self): def d(worker): self.data = worker.peek_receive_buf() i.set_receive_handler(d) - i.push_receive_data("bar") + i._push_receive_data("bar") self.assertEqual(self.data, "bar") # d does not consume the data - i.push_receive_data("hepp") + i._push_receive_data("hepp") self.assertEqual(self.data, "barhepp") def test_receive_consume(self): @@ -40,10 +40,10 @@ def consume(worker): self.data = worker.peek_receive_buf() worker.consume_receive_buf(len(self.data)) i.set_receive_handler(consume) - i.push_receive_data("bar") + i._push_receive_data("bar") self.assertEqual(self.data, "bar") # data has been consumed - i.push_receive_data("hepp") + i._push_receive_data("hepp") self.assertEqual(self.data, "hepp") @@ -88,13 +88,14 @@ def test_run_close(self): (left, right) = MockSocket.pair() worker = loop.create_worker_for_socket(left) - self.assertTrue(worker in loop.workers) + self.assertFalse(worker in loop._workers, "Should not add to _workers yet, until we start up the loop") + self.assertTrue(loop._pending_commands.qsize() == 1, "Should have added pending create() command") worker.close() # This causes the worker to be scheduled to be closed -- it also # calls pinger.ping(). However, the Select task won't receive the ping # Until after this method has completed! Thus, we only test whether # worker has been added to the pending close queue - self.assertTrue(worker in loop.pending_worker_closes) + self.assertTrue(loop._pending_commands.qsize() == 2, "Should have added pending close() command") def test_run_write(self): loop = RecocoIOLoop()