Skip to content

Commit

Permalink
Move and rename NOMFlowTable
Browse files Browse the repository at this point in the history
This is really a topology entity and doesn't belong in
openflow.flow_table.  It now lives in topology as
OFSyncFlowTable.
  • Loading branch information
MurphyMc committed Feb 24, 2013
1 parent 01b9ddb commit bea9e66
Show file tree
Hide file tree
Showing 4 changed files with 310 additions and 259 deletions.
142 changes: 3 additions & 139 deletions pox/openflow/flow_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
Implementation of an OpenFlow flow table
"""

from collections import namedtuple
from libopenflow_01 import *
from pox.lib.revent import *

Expand Down Expand Up @@ -119,12 +118,14 @@ def flow_stats(self, now=None):
actions = self.actions
)


class FlowTableModification (Event):
def __init__(self, added=[], removed=[]):
Event.__init__(self)
self.added = added
self.removed = removed


class FlowTable (EventMixin):
_eventMixin_events = set([FlowTableModification])

Expand Down Expand Up @@ -213,6 +214,7 @@ def entry_for_packet(self, packet, in_port):
else:
return None


class SwitchFlowTable(FlowTable):
"""
Model a flow table for our switch implementation. Handles the behavior in response
Expand Down Expand Up @@ -252,141 +254,3 @@ def process_flow_mod(self, flow_mod):
return ("removed", self.remove_matching_entries(flow_mod.match, flow_mod.priority, strict=True))
else:
raise AttributeError("Command not yet implemented: %s" % flow_mod.command)

class NOMFlowTable(EventMixin):
_eventMixin_events = set([FlowTableModification])
"""
Model a flow table for use in our NOM model. Keep in sync with a switch through a
connection.
"""
ADD = OFPFC_ADD
REMOVE = OFPFC_DELETE
REMOVE_STRICT = OFPFC_DELETE_STRICT
TIME_OUT = 2

def __init__(self, switch=None, **kw):
EventMixin.__init__(self)
self.flow_table = FlowTable()
self.switch = switch

# a list of pending flow table entries : tuples (ADD|REMOVE, entry)
self._pending = []

# a map of pending barriers barrier_xid-> ([entry1,entry2])
self._pending_barrier_to_ops = {}
# a map of pending barriers per request entry -> (barrier_xid, time)
self._pending_op_to_barrier = {}

self.listenTo(switch)

def install(self, entries=[]):
""" asynchronously install entries in the flow table. will raise a FlowTableModification event when
the change has been processed by the switch """
self._mod(entries, NOMFlowTable.ADD)

def remove_with_wildcards(self, entries=[]):
""" asynchronously remove entries in the flow table. will raise a FlowTableModification event when
the change has been processed by the switch """
self._mod(entries, NOMFlowTable.REMOVE)

def remove_strict(self, entries=[]):
""" asynchronously remove entries in the flow table. will raise a FlowTableModification event when
the change has been processed by the switch """
self._mod(entries, NOMFlowTable.REMOVE_STRICT)

@property
def entries(self):
return self.flow_table.entries

@property
def num_pending(self):
return len(self._pending)

def __len__(self):
return len(self.flow_table)

def _mod(self, entries, command):
if isinstance(entries, TableEntry):
entries = [ entries ]

for entry in entries:
if(command == NOMFlowTable.REMOVE):
self._pending = filter(lambda(command, pentry): not (command == NOMFlowTable.ADD and entry.matches_with_wildcards(pentry)), self._pending)
elif(command == NOMFlowTable.REMOVE_STRICT):
self._pending = filter(lambda(command, pentry): not (command == NOMFlowTable.ADD and entry == pentry), self._pending)

self._pending.append( (command, entry) )

self._sync_pending()

def _sync_pending(self, clear=False):
if not self.switch.connected:
return False

# resync the switch
if clear:
self._pending_barrier_to_ops = {}
self._pending_op_to_barrier = {}
self._pending = filter(lambda(op): op[0] == NOMFlowTable.ADD, self._pending)

self.switch.send(ofp_flow_mod(command=OFPFC_DELETE, match=ofp_match()))
self.switch.send(ofp_barrier_request())

todo = map(lambda(e): (NOMFlowTable.ADD, e), self.flow_table.entries) + self._pending
else:
todo = [ op for op in self._pending
if op not in self._pending_op_to_barrier or (self._pending_op_to_barrier[op][1] + NOMFlowTable.TIME_OUT) < time.time() ]

for op in todo:
fmod_xid = self.switch._xid_generator()
flow_mod = op[1].to_flow_mod(xid=fmod_xid, command=op[0], flags=op[1].flags | OFPFF_SEND_FLOW_REM)
self.switch.send(flow_mod)

barrier_xid = self.switch._xid_generator()
self.switch.send(ofp_barrier_request(xid=barrier_xid))
now = time.time()
self._pending_barrier_to_ops[barrier_xid] = todo

for op in todo:
self._pending_op_to_barrier[op] = (barrier_xid, now)

def _handle_SwitchConnectionUp(self, event):
# sync all_flows
self._sync_pending(clear=True)

def _handle_SwitchConnectionDown(self, event):
# connection down. too bad for our unconfirmed entries
self._pending_barrier_to_ops = {}
self._pending_op_to_barrier = {}

def _handle_BarrierIn(self, barrier):
# yeah. barrier in. time to sync some of these flows
if barrier.xid in self._pending_barrier_to_ops:
added = []
removed = []
#print "barrier in: pending for barrier: %d: %s" % (barrier.xid, self._pending_barrier_to_ops[barrier.xid])
for op in self._pending_barrier_to_ops[barrier.xid]:
(command, entry) = op
if(command == NOMFlowTable.ADD):
self.flow_table.add_entry(entry)
added.append(entry)
else:
removed.extend(self.flow_table.remove_matching_entries(entry.match, entry.priority, strict=command == NOMFlowTable.REMOVE_STRICT))
#print "op: %s, pending: %s" % (op, self._pending)
if op in self._pending: self._pending.remove(op)
self._pending_op_to_barrier.pop(op, None)
del self._pending_barrier_to_ops[barrier.xid]
self.raiseEvent(FlowTableModification(added = added, removed=removed))
return EventHalt
else:
return EventContinue

def _handle_FlowRemoved(self, event):
""" process a flow removed event -- remove the matching flow from the table. """
flow_removed = event.ofp
for entry in self.flow_table.entries:
if(flow_removed.match == entry.match and flow_removed.priority == entry.priority):
self.flow_table.remove_entry(entry)
self.raiseEvent(FlowTableModification(removed=[entry]))
return EventHalt
return EventContinue
169 changes: 167 additions & 2 deletions pox/openflow/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from pox.topology.topology import *
from pox.openflow.discovery import *
from pox.openflow.libopenflow_01 import xid_generator
from pox.openflow.flow_table import NOMFlowTable
from pox.openflow.flow_table import FlowTable,FlowTableModification,TableEntry
from pox.lib.util import dpidToStr
from pox.lib.addresses import *

Expand Down Expand Up @@ -185,7 +185,7 @@ def __init__ (self, dpid):
EventMixin.__init__(self)
self.dpid = dpid
self.ports = {}
self.flow_table = NOMFlowTable(self)
self.flow_table = OFSyncFlowTable(self)
self.capabilities = 0
self._connection = None
self._listeners = []
Expand Down Expand Up @@ -295,6 +295,171 @@ def name(self):
return repr(self)


class OFSyncFlowTable (EventMixin):
_eventMixin_events = set([FlowTableModification])
"""
A flow table that keeps in sync with a switch
"""
ADD = of.OFPFC_ADD
REMOVE = of.OFPFC_DELETE
REMOVE_STRICT = of.OFPFC_DELETE_STRICT
TIME_OUT = 2

def __init__ (self, switch=None, **kw):
EventMixin.__init__(self)
self.flow_table = FlowTable()
self.switch = switch

# a list of pending flow table entries : tuples (ADD|REMOVE, entry)
self._pending = []

# a map of pending barriers barrier_xid-> ([entry1,entry2])
self._pending_barrier_to_ops = {}
# a map of pending barriers per request entry -> (barrier_xid, time)
self._pending_op_to_barrier = {}

self.listenTo(switch)

def install (self, entries=[]):
"""
asynchronously install entries in the flow table
will raise a FlowTableModification event when the change has been
processed by the switch
"""
self._mod(entries, OFSyncFlowTable.ADD)

def remove_with_wildcards (self, entries=[]):
"""
asynchronously remove entries in the flow table
will raise a FlowTableModification event when the change has been
processed by the switch
"""
self._mod(entries, OFSyncFlowTable.REMOVE)

def remove_strict (self, entries=[]):
"""
asynchronously remove entries in the flow table.
will raise a FlowTableModification event when the change has been
processed by the switch
"""
self._mod(entries, OFSyncFlowTable.REMOVE_STRICT)

@property
def entries (self):
return self.flow_table.entries

@property
def num_pending (self):
return len(self._pending)

def __len__ (self):
return len(self.flow_table)

def _mod (self, entries, command):
if isinstance(entries, TableEntry):
entries = [ entries ]

for entry in entries:
if(command == OFSyncFlowTable.REMOVE):
self._pending = [(cmd,pentry) for cmd,pentry in self._pending
if not (cmd == OFSyncFlowTable.ADD
and entry.matches_with_wildcards(pentry))]
elif(command == OFSyncFlowTable.REMOVE_STRICT):
self._pending = [(cmd,pentry) for cmd,pentry in self._pending
if not (cmd == OFSyncFlowTable.ADD
and entry == pentry)]

self._pending.append( (command, entry) )

self._sync_pending()

def _sync_pending (self, clear=False):
if not self.switch.connected:
return False

# resync the switch
if clear:
self._pending_barrier_to_ops = {}
self._pending_op_to_barrier = {}
self._pending = filter(lambda(op): op[0] == OFSyncFlowTable.ADD,
self._pending)

self.switch.send(of.ofp_flow_mod(command=of.OFPFC_DELETE,
match=of.ofp_match()))
self.switch.send(of.ofp_barrier_request())

todo = map(lambda(e): (OFSyncFlowTable.ADD, e),
self.flow_table.entries) + self._pending
else:
todo = [op for op in self._pending
if op not in self._pending_op_to_barrier
or (self._pending_op_to_barrier[op][1]
+ OFSyncFlowTable.TIME_OUT) < time.time() ]

for op in todo:
fmod_xid = self.switch._xid_generator()
flow_mod = op[1].to_flow_mod(xid=fmod_xid, command=op[0],
flags=op[1].flags | of.OFPFF_SEND_FLOW_REM)
self.switch.send(flow_mod)

barrier_xid = self.switch._xid_generator()
self.switch.send(of.ofp_barrier_request(xid=barrier_xid))
now = time.time()
self._pending_barrier_to_ops[barrier_xid] = todo

for op in todo:
self._pending_op_to_barrier[op] = (barrier_xid, now)

def _handle_SwitchConnectionUp (self, event):
# sync all_flows
self._sync_pending(clear=True)

def _handle_SwitchConnectionDown (self, event):
# connection down. too bad for our unconfirmed entries
self._pending_barrier_to_ops = {}
self._pending_op_to_barrier = {}

def _handle_BarrierIn (self, barrier):
# yeah. barrier in. time to sync some of these flows
if barrier.xid in self._pending_barrier_to_ops:
added = []
removed = []
#print "barrier in: pending for barrier: %d: %s" % (barrier.xid,
# self._pending_barrier_to_ops[barrier.xid])
for op in self._pending_barrier_to_ops[barrier.xid]:
(command, entry) = op
if(command == OFSyncFlowTable.ADD):
self.flow_table.add_entry(entry)
added.append(entry)
else:
removed.extend(self.flow_table.remove_matching_entries(entry.match,
entry.priority, strict=command == OFSyncFlowTable.REMOVE_STRICT))
#print "op: %s, pending: %s" % (op, self._pending)
if op in self._pending: self._pending.remove(op)
self._pending_op_to_barrier.pop(op, None)
del self._pending_barrier_to_ops[barrier.xid]
self.raiseEvent(FlowTableModification(added = added, removed=removed))
return EventHalt
else:
return EventContinue

def _handle_FlowRemoved (self, event):
"""
process a flow removed event -- remove the matching flow from the table.
"""
flow_removed = event.ofp
for entry in self.flow_table.entries:
if (flow_removed.match == entry.match
and flow_removed.priority == entry.priority):
self.flow_table.remove_entry(entry)
self.raiseEvent(FlowTableModification(removed=[entry]))
return EventHalt
return EventContinue


def launch ():
if not core.hasComponent("openflow_topology"):
core.register("openflow_topology", OpenFlowTopology())
Loading

0 comments on commit bea9e66

Please sign in to comment.