Skip to content

Commit

Permalink
Removed debug logs. advertiseToNetwork() replaced watchTable().
Browse files Browse the repository at this point in the history
Debug logs that were helpful when squashing bugs have been removed.

advertiseToNetwork replaced the watchTable which originally watched the
routing table entries. We now take a different approach to propagating
the local registry services into the network registry.
  • Loading branch information
milosgajdos committed Jun 19, 2019
1 parent d3525eb commit 59035ab
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 121 deletions.
173 changes: 67 additions & 106 deletions router/default_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,19 @@ import (
"sync"
"time"

"github.com/micro/go-log"
"github.com/micro/go-micro/registry"
"github.com/olekukonko/tablewriter"
)

var (
// AdvertiseToNetworkTick defines how often in seconds do we scal the local registry
// to advertise the local services to the network registry
AdvertiseToNetworkTick = 5 * time.Second
// AdvertiseNetworkTTL defines network registry TTL in seconds
// NOTE: this is a rather arbitrary picked value subject to change
AdvertiseNetworkTTL = 120 * time.Second
)

type router struct {
opts Options
exit chan struct{}
Expand Down Expand Up @@ -85,56 +93,46 @@ func (r *router) Start() error {
return fmt.Errorf("failed adding routes for network services: %v", err)
}

// routing table has been bootstrapped;
// NOTE: we only need to advertise local services upstream
// lookup local service routes and advertise them upstream
query := NewQuery(QueryNetwork("local"))
localRoutes, err := r.opts.Table.Lookup(query)
if err != nil && err != ErrRouteNotFound {
return fmt.Errorf("failed to lookup local service routes: %v", err)
}

node, err := r.parseToNode()
if err != nil {
return fmt.Errorf("failed to parse router into service node: %v", err)
}

for _, route := range localRoutes {
service := &registry.Service{
Name: route.Options().DestAddr,
Nodes: []*registry.Node{node},
}
if err := r.opts.NetworkRegistry.Register(service, registry.RegisterTTL(120*time.Second)); err != nil {
return fmt.Errorf("failed to register service %s in network registry: %v", service.Name, err)
}
}

localWatcher, err := r.opts.LocalRegistry.Watch()
localRegWatcher, err := r.opts.LocalRegistry.Watch()
if err != nil {
return fmt.Errorf("failed to create local registry watcher: %v", err)
}

networkWatcher, err := r.opts.NetworkRegistry.Watch()
networkRegWatcher, err := r.opts.NetworkRegistry.Watch()
if err != nil {
return fmt.Errorf("failed to create network registry watcher: %v", err)
}

// NOTE: we only watch local netwrork entries which we then propagate upstream to network
tableWatcher, err := r.opts.Table.Watch(WatchNetwork("local"))
if err != nil {
return fmt.Errorf("failed to create routing table watcher: %v", err)
}
// error channel collecting goroutine errors
errChan := make(chan error, 3)

r.wg.Add(1)
go r.manageServiceRoutes(localWatcher, "local", DefaultLocalMetric)
go func() {
defer r.wg.Done()
// watch local registry and register routes in routine table
errChan <- r.manageServiceRoutes(localRegWatcher, "local", DefaultLocalMetric)
}()

r.wg.Add(1)
go r.manageServiceRoutes(networkWatcher, r.opts.NetworkAddress, DefaultNetworkMetric)
go func() {
defer r.wg.Done()
// watch network registry and register routes in routine table
errChan <- r.manageServiceRoutes(networkRegWatcher, r.opts.NetworkAddress, DefaultNetworkMetric)
}()

r.wg.Add(1)
go r.watchTable(tableWatcher)
go func() {
defer r.wg.Done()
// watch local registry and advertise local service to the network
errChan <- r.advertiseToNetwork(node)
}()

return nil
return <-errChan
}

// addServiceRouteslists all available services in given registry and adds them to the routing table.
Expand Down Expand Up @@ -182,11 +180,40 @@ func (r *router) parseToNode() (*registry.Node, error) {
return node, nil
}

// advertiseToNetwork periodically scans local registry and registers (i.e. advertises) all the local services in the network registry.
// It returns error if either the local services failed to be listed or if it fails to register local service in network registry.
func (r *router) advertiseToNetwork(node *registry.Node) error {
// ticker to periodically scan the local registry
ticker := time.NewTicker(AdvertiseToNetworkTick)

for {
select {
case <-r.exit:
return nil
case <-ticker.C:
// list all local services
services, err := r.opts.LocalRegistry.ListServices()
if err != nil {
return fmt.Errorf("failed to list local services: %v", err)
}
// loop through all registered local services and register them in the network registry
for _, service := range services {
svc := &registry.Service{
Name: service.Name,
Nodes: []*registry.Node{node},
}
// register the local service in the network registry
if err := r.opts.NetworkRegistry.Register(svc, registry.RegisterTTL(AdvertiseNetworkTTL)); err != nil {
return fmt.Errorf("failed to register service %s in network registry: %v", svc.Name, err)
}
}
}
}
}

// manageServiceRoutes watches services in given registry and updates the routing table accordingly.
// It returns error if the service registry watcher has stopped or if the routing table failed to be updated.
func (r *router) manageServiceRoutes(w registry.Watcher, network string, metric int) error {
defer r.wg.Done()

// wait in the background for the router to stop
// when the router stops, stop the watcher and exit
r.wg.Add(1)
Expand All @@ -198,7 +225,6 @@ func (r *router) manageServiceRoutes(w registry.Watcher, network string, metric

var watchErr error

// watch for changes to services
for {
res, err := w.Next()
if err == registry.ErrWatcherStopped {
Expand All @@ -207,7 +233,6 @@ func (r *router) manageServiceRoutes(w registry.Watcher, network string, metric

if err != nil {
watchErr = err
log.Logf("[router] registry error: %s", err)
break
}

Expand All @@ -220,92 +245,33 @@ func (r *router) manageServiceRoutes(w registry.Watcher, network string, metric

switch res.Action {
case "create":
log.Logf("[router] received <%s> create event for service %s", network, res.Service.Name)
if len(res.Service.Nodes) > 0 {
log.Logf("[router] adding <%s> service %s to routing table", network, res.Service.Name)
/// only return error if the route is not duplicate, but something else has failed
if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute {
return fmt.Errorf("failed to add route for service: %v", res.Service.Name)
}
log.Logf("[router] route successfully added; routing table: \n%s", r.opts.Table)
}
case "delete":
log.Logf("[router] received <%s> delete event for service %s", network, res.Service.Name)
//log.Logf("[router] <%s> service nodes: %v", network, res.Service.Nodes)
if len(res.Service.Nodes) < 1 {
log.Logf("[router] removing <%s> service %s from routing table", network, res.Service.Name)
// only return error if the route is present in the table, but something else has failed
if err := r.opts.Table.Delete(route); err != nil && err != ErrRouteNotFound {
return fmt.Errorf("failed to delete route for service: %v", res.Service.Name)
}
log.Logf("[router] route successfully deleted; routing table: \n%s", r.opts.Table)
}
}
}

return watchErr
}

// watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry
// It returns error if the locally registered services either fails to be added/deleted to/from network registry.
func (r *router) watchTable(w Watcher) error {
defer r.wg.Done()

r.wg.Add(1)
go func() {
defer r.wg.Done()
<-r.exit
w.Stop()
}()

var watchErr error

// watch for changes to services
for {
event, err := w.Next()
if err == ErrWatcherStopped {
break
}

if err != nil {
watchErr = err
log.Logf("[router] routing table error: %s", err)
break
}

node, err := r.parseToNode()
if err != nil {
return fmt.Errorf("failed to parse router into node: %v", err)
}

// we know that .DestAddr contains the registered service name
service := &registry.Service{
Name: event.Route.Options().DestAddr,
Nodes: []*registry.Node{node},
}

switch event.Type {
case CreateEvent:
log.Logf("[router] adding service %s to network registry", event.Route.Options().DestAddr)
//if err := r.opts.NetworkRegistry.Register(service, registry.RegisterTTL(120*time.Second)); err != nil {
if err := r.opts.NetworkRegistry.Register(service, registry.RegisterTTL(5*time.Second)); err != nil {
return fmt.Errorf("failed to register service %s in network registry: %v", service.Name, err)
}
log.Logf("[router] successfully added service %s to network registry", event.Route.Options().DestAddr)
case DeleteEvent:
log.Logf("[router] deleting service %s from network registry", event.Route.Options().DestAddr)
if err := r.opts.NetworkRegistry.Deregister(service); err != nil {
return fmt.Errorf("failed to deregister service %s from network registry: %v", service.Name, err)
}
log.Logf("[router] successfully deleted service %s from network registry", event.Route.Options().DestAddr)
}
}

return watchErr
}

// Stop stops the router
func (r *router) Stop() error {
// notify all goroutines to finish
close(r.exit)

// wait for all goroutines to finish
r.wg.Wait()

// NOTE: we need a more efficient way of doing this e.g. network routes
// should ideally be autodeleted when the router stops gossiping
// deregister all services advertised by this router from remote registry
Expand All @@ -315,6 +281,7 @@ func (r *router) Stop() error {
return fmt.Errorf("failed to lookup routes for router %s: %v", r.opts.ID, err)
}

// parse router to registry.Node
node, err := r.parseToNode()
if err != nil {
return fmt.Errorf("failed to parse router into service node: %v", err)
Expand All @@ -330,12 +297,6 @@ func (r *router) Stop() error {
}
}

// notify all goroutines to finish
close(r.exit)

// wait for all goroutines to finish
r.wg.Wait()

return nil
}

Expand Down
31 changes: 17 additions & 14 deletions router/default_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ import (
"sync"

"github.com/google/uuid"
"github.com/micro/go-log"
"github.com/olekukonko/tablewriter"
)

// TODO: table options TBD in the future
// TableOptions are routing table options
// TODO: table options TBD in the future
type TableOptions struct{}

// table is in memory routing table
Expand Down Expand Up @@ -71,11 +70,8 @@ func (t *table) Add(r Route) error {
t.Lock()
defer t.Unlock()

log.Logf("[table] AddRoute request %d %s: \n%s", sum, r.Options().Policy, r)

// check if the destination has any routes in the table
if _, ok := t.m[destAddr]; !ok {
log.Logf("[table] destination does NOT exist ADDING: \n%s", r)
t.m[destAddr] = make(map[uint64]Route)
t.m[destAddr][sum] = r
go t.sendEvent(&Event{Type: CreateEvent, Route: r})
Expand All @@ -84,15 +80,13 @@ func (t *table) Add(r Route) error {

// add new route to the table for the given destination
if _, ok := t.m[destAddr][sum]; !ok {
log.Logf("[table] route does NOT exist ADDING: \n%s", r)
t.m[destAddr][sum] = r
go t.sendEvent(&Event{Type: CreateEvent, Route: r})
return nil
}

// only add the route if it exists and if override is requested
// only add the route if the route override is explicitly requested
if _, ok := t.m[destAddr][sum]; ok && r.Options().Policy == OverrideIfExists {
log.Logf("[table] route does exist OVERRIDING: \n%s", r)
t.m[destAddr][sum] = r
go t.sendEvent(&Event{Type: UpdateEvent, Route: r})
return nil
Expand All @@ -101,12 +95,9 @@ func (t *table) Add(r Route) error {
// if we reached this point without already returning the route already exists
// we return nil only if explicitly requested by the client
if r.Options().Policy == IgnoreIfExists {
log.Logf("[table] route does exist IGNORING: \n%s", r)
return nil
}

log.Logf("[table] AddRoute request: DUPPLICATE ROUTE")

return ErrDuplicateRoute
}

Expand All @@ -118,10 +109,7 @@ func (t *table) Delete(r Route) error {
destAddr := r.Options().DestAddr
sum := t.hash(r)

log.Logf("[table] DeleteRoute request %d: \n%s", sum, r)

if _, ok := t.m[destAddr]; !ok {
log.Logf("[table] DeleteRoute Route NOT found: %s", r)
return ErrRouteNotFound
}

Expand Down Expand Up @@ -154,6 +142,21 @@ func (t *table) Update(r Route) error {
return ErrRouteNotFound
}

// List returns a list of all routes in the table
func (t *table) List() ([]Route, error) {
t.RLock()
defer t.RUnlock()

var routes []Route
for _, rmap := range t.m {
for _, route := range rmap {
routes = append(routes, route)
}
}

return routes, nil
}

// Lookup queries routing table and returns all routes that match it
func (t *table) Lookup(q Query) ([]Route, error) {
t.RLock()
Expand Down
2 changes: 2 additions & 0 deletions router/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type Table interface {
Delete(Route) error
// Update updates route in the routing table
Update(Route) error
// List returns the list of all routes in the table
List() ([]Route, error)
// Lookup looks up routes in the routing table and returns them
Lookup(Query) ([]Route, error)
// Watch returns a watcher which allows to track updates to the routing table
Expand Down
Loading

0 comments on commit 59035ab

Please sign in to comment.