Skip to content

Commit

Permalink
Merge branch 'master' of github.com:noxrepo/pox
Browse files Browse the repository at this point in the history
  • Loading branch information
apanda committed Apr 14, 2012
2 parents b4ab569 + 5ae0ad3 commit ae563d5
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 47 deletions.
File renamed without changes.
14 changes: 9 additions & 5 deletions pox.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,11 +334,15 @@ def setup_logging(log_config="logging.cfg", fail_if_non_existent=False):
logging.config.fileConfig(log_config)
else:
if fail_if_non_existent:
raise IOError("Could not find logging config file: %s" % log_config)

_default_log_handler = logging.StreamHandler()
_default_log_handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
logging.getLogger().addHandler(_default_log_handler)
raise IOError("Could not find logging config file: %s" % (log_config,))

# This is kind of a hack, but we need to keep track of the handler we
# install so that we can, for example, uninstall it later. This code
# originally lived in pox.core, so we explicitly reference it here.
pox.core._default_log_handler = logging.StreamHandler()
formatter = logging.Formatter(logging.BASIC_FORMAT)
pox.core._default_log_handler.setFormatter(formatter)
logging.getLogger().addHandler(pox.core._default_log_handler)
logging.getLogger().setLevel(logging.DEBUG)

def main ():
Expand Down
87 changes: 54 additions & 33 deletions pox/lib/ioworker/io_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,42 @@ def __init__(self):
self._on_data_receive = lambda worker: None

def set_receive_handler(self, block):
""" Cause us to call the given block whenever data is ready to be read """
self._on_data_receive = block

def send(self, data):
""" send data from the client side. fire and forget. """
assert_type("data", data, [bytes], none_ok=False)
self.send_buf += data

def push_receive_data(self, new_data):
""" notify client of new received data. called by a Select loop """
def _push_receive_data(self, new_data):
# notify client of new received data. called by a Select loop
self.receive_buf += new_data
self._on_data_receive(self)

def peek_receive_buf(self):
""" Grab the receive buffer. Don't modify it! """
return self.receive_buf

def consume_receive_buf(self, l):
""" called from the client to consume receive buffer """
""" Consume receive buffer """
# called from the client
assert(len(self.receive_buf) >= l)
self.receive_buf = self.receive_buf[l:]

@property
def ready_to_send(self):
def _ready_to_send(self):
# called by Select loop
return len(self.send_buf) > 0

def consume_send_buf(self, l):
def _consume_send_buf(self, l):
# Throw out the first l bytes of the send buffer
# Called by Select loop
assert(len(self.send_buf)>=l)
self.send_buf = self.send_buf[l:]

def close(self):
""" Close this socket """
pass

class RecocoIOWorker(IOWorker):
Expand All @@ -62,20 +69,25 @@ def __init__(self, socket, pinger, on_close):
IOWorker.__init__(self)
self.socket = socket
self.pinger = pinger
# (on_close factory method hides details of the Select loop)
self.on_close = on_close

def fileno(self):
""" Return the wrapped sockets' fileno """
return self.socket.fileno()

def send(self, data):
""" send data from the client side. fire and forget. """
IOWorker.send(self, data)
self.pinger.ping()

def close(self):
""" Register this socket to be closed. fire and forget """
# (don't close until Select loop is ready)
IOWorker.close(self)
# on_close is a function not a method
self.on_close(self)

class RecocoIOLoop(Task):
"""
recoco task that handles the actual IO for our IO workers
Expand All @@ -85,27 +97,37 @@ class RecocoIOLoop(Task):

def __init__ (self):
Task.__init__(self)
self.workers = set()
self._workers = set()
self.pinger = makePinger()
# socket.close() must be performed by this Select task -- otherwise
# we'll end up blocking on socket that doesn't exist.
self.pending_worker_closes = []
# socket.open() and socket.close() are performed by this Select task
# other threads register open() and close() requests by adding lambdas
# to this thread-safe queue.
self._pending_commands = Queue.Queue()

def create_worker_for_socket(self, socket):
'''
Return an IOWorker wrapping the given socket.
'''
# Called from external threads.
# Does not register the IOWorker immediately with the select loop --
# rather, adds a command to the pending queue

# Our callback for io_worker.close():
def on_close(worker):
''' callback for io_worker.close() '''
self.pending_worker_closes.append(worker)
def close_worker(worker):
# Actually close the worker (called by Select loop)
worker.socket.close()
self._workers.discard(worker)
# schedule close_worker to be called by Select loop
self._pending_commands.put(lambda: close_worker(worker))
self.pinger.ping()

worker = RecocoIOWorker(socket, pinger=self.pinger, on_close=on_close)
self.workers.add(worker)
# Don't add immediately, since we're in the wrong thread
self._pending_commands.put(lambda: self._workers.add(worker))
self.pinger.ping()
return worker

def _close_worker(self, worker):
''' only called by our Select task '''
worker.socket.close()
self.workers.discard(worker)


def stop(self):
self.running = False
self.pinger.ping()
Expand All @@ -114,15 +136,14 @@ def run (self):
self.running = True
while self.running:
try:
# First, close and pending sockets
for io_worker in self.pending_worker_closes:
self._close_worker(io_worker)
self.pending_socket_closes = []
# First, execute pending commands
while not self._pending_commands.empty():
self._pending_commands.get()()

# Now grab remaining workers
read_sockets = list(self.workers) + [ self.pinger ]
write_sockets = [ worker for worker in self.workers if worker.ready_to_send ]
exception_sockets = list(self.workers)
# Now grab workers
read_sockets = list(self._workers) + [ self.pinger ]
write_sockets = [ worker for worker in self._workers if worker._ready_to_send ]
exception_sockets = list(self._workers)

rlist, wlist, elist = yield Select(read_sockets, write_sockets,
exception_sockets, self._select_timeout)
Expand All @@ -133,28 +154,28 @@ def run (self):

for worker in elist:
worker.close()
if worker in self.workers:
self.workers.remove(worker)
if worker in self._workers:
self._workers.remove(worker)

for worker in rlist:
try:
data = worker.socket.recv(self._BUF_SIZE)
worker.push_receive_data(data)
worker._push_receive_data(data)
except socket.error as (s_errno, strerror):
log.error("Socket error: " + strerror)
worker.close()
self.workers.discard(worker)
self._workers.discard(worker)

for worker in wlist:
try:
l = worker.socket.send(worker.send_buf)
if l > 0:
worker.consume_send_buf(l)
worker._consume_send_buf(l)
except socket.error as (s_errno, strerror):
if s_errno != errno.EAGAIN:
log.error("Socket error: " + strerror)
worker.close()
self.workers.discard(worker)
self._workers.discard(worker)

except exceptions.KeyboardInterrupt:
break
31 changes: 31 additions & 0 deletions pox/samples/pretty_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright 2012 James McCauley
#
# This file is part of POX.
#
# POX is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# POX is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with POX. If not, see <http://www.gnu.org/licenses/>.

"""
This is a very simple component which provides some kind of nice
log formatting.
It demonstrates launching another component (there should eventually
be a nice interface for doing this), and formatting color log messages.
"""

def launch ():
import pox.log.color
pox.log.color.launch()
import pox.log
pox.log.launch(format="[@@@bold@@@level%(name)-22s@@@reset] " +
"@@@bold%(message)s@@@normal")
File renamed without changes.
19 changes: 10 additions & 9 deletions tests/unit/lib/ioworker/io_worker_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@ class IOWorkerTest(unittest.TestCase):
def test_basic_send(self):
i = IOWorker()
i.send("foo")
self.assertTrue(i.ready_to_send)
self.assertTrue(i._ready_to_send)
self.assertEqual(i.send_buf, "foo")
i.consume_send_buf(3)
self.assertFalse(i.ready_to_send)
i._consume_send_buf(3)
self.assertFalse(i._ready_to_send)

def test_basic_receive(self):
i = IOWorker()
self.data = None
def d(worker):
self.data = worker.peek_receive_buf()
i.set_receive_handler(d)
i.push_receive_data("bar")
i._push_receive_data("bar")
self.assertEqual(self.data, "bar")
# d does not consume the data
i.push_receive_data("hepp")
i._push_receive_data("hepp")
self.assertEqual(self.data, "barhepp")

def test_receive_consume(self):
Expand All @@ -40,10 +40,10 @@ def consume(worker):
self.data = worker.peek_receive_buf()
worker.consume_receive_buf(len(self.data))
i.set_receive_handler(consume)
i.push_receive_data("bar")
i._push_receive_data("bar")
self.assertEqual(self.data, "bar")
# data has been consumed
i.push_receive_data("hepp")
i._push_receive_data("hepp")
self.assertEqual(self.data, "hepp")


Expand Down Expand Up @@ -88,13 +88,14 @@ def test_run_close(self):
(left, right) = MockSocket.pair()
worker = loop.create_worker_for_socket(left)

self.assertTrue(worker in loop.workers)
self.assertFalse(worker in loop._workers, "Should not add to _workers yet, until we start up the loop")
self.assertTrue(loop._pending_commands.qsize() == 1, "Should have added pending create() command")
worker.close()
# This causes the worker to be scheduled to be closed -- it also
# calls pinger.ping(). However, the Select task won't receive the ping
# Until after this method has completed! Thus, we only test whether
# worker has been added to the pending close queue
self.assertTrue(worker in loop.pending_worker_closes)
self.assertTrue(loop._pending_commands.qsize() == 2, "Should have added pending close() command")

def test_run_write(self):
loop = RecocoIOLoop()
Expand Down

0 comments on commit ae563d5

Please sign in to comment.