Skip to content

Commit

Permalink
tests: fixed a few tests
Browse files Browse the repository at this point in the history
Also had the event listener implementation simplified
  • Loading branch information
lni committed Oct 11, 2020
1 parent 9f07f28 commit 48822cf
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 150 deletions.
137 changes: 25 additions & 112 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,22 +144,22 @@ func (e *raftEventListener) ReadIndexDropped(info server.ReadIndexInfo) {
}

type sysEventListener struct {
stopc chan struct{}
events chan server.SystemEvent
userListener raftio.ISystemEventListener
stopc chan struct{}
events chan server.SystemEvent
ul raftio.ISystemEventListener
}

func newSysEventListener(l raftio.ISystemEventListener,
stopc chan struct{}) *sysEventListener {
return &sysEventListener{
stopc: stopc,
events: make(chan server.SystemEvent),
userListener: l,
stopc: stopc,
events: make(chan server.SystemEvent),
ul: l,
}
}

func (l *sysEventListener) Publish(e server.SystemEvent) {
if l.userListener == nil {
if l.ul == nil {
return
}
select {
Expand All @@ -170,37 +170,40 @@ func (l *sysEventListener) Publish(e server.SystemEvent) {
}

func (l *sysEventListener) handle(e server.SystemEvent) {
if l.ul == nil {
return
}
switch e.Type {
case server.NodeHostShuttingDown:
l.handleNodeHostShuttingDown(e)
l.ul.NodeHostShuttingDown()
case server.NodeReady:
l.handleNodeReady(e)
l.ul.NodeReady(getNodeInfo(e))
case server.NodeUnloaded:
l.handleNodeUnloaded(e)
l.ul.NodeUnloaded(getNodeInfo(e))
case server.MembershipChanged:
l.handleMembershipChanged(e)
l.ul.MembershipChanged(getNodeInfo(e))
case server.ConnectionEstablished:
l.handleConnectionEstablished(e)
l.ul.ConnectionEstablished(getConnectionInfo(e))
case server.ConnectionFailed:
l.handleConnectionFailed(e)
l.ul.ConnectionFailed(getConnectionInfo(e))
case server.SendSnapshotStarted:
l.handleSendSnapshotStarted(e)
l.ul.SendSnapshotStarted(getSnapshotInfo(e))
case server.SendSnapshotCompleted:
l.handleSendSnapshotCompleted(e)
l.ul.SendSnapshotCompleted(getSnapshotInfo(e))
case server.SendSnapshotAborted:
l.handleSendSnapshotAborted(e)
l.ul.SendSnapshotAborted(getSnapshotInfo(e))
case server.SnapshotReceived:
l.handleSnapshotReceived(e)
l.ul.SnapshotReceived(getSnapshotInfo(e))
case server.SnapshotRecovered:
l.handleSnapshotRecovered(e)
l.ul.SnapshotRecovered(getSnapshotInfo(e))
case server.SnapshotCreated:
l.handleSnapshotCreated(e)
l.ul.SnapshotCreated(getSnapshotInfo(e))
case server.SnapshotCompacted:
l.handleSnapshotCompacted(e)
l.ul.SnapshotCompacted(getSnapshotInfo(e))
case server.LogCompacted:
l.handleLogCompacted(e)
l.ul.LogCompacted(getEntryInfo(e))
case server.LogDBCompacted:
l.handleLogDBCompacted(e)
l.ul.LogDBCompacted(getEntryInfo(e))
default:
panic("unknown event type")
}
Expand Down Expand Up @@ -236,93 +239,3 @@ func getConnectionInfo(e server.SystemEvent) raftio.ConnectionInfo {
SnapshotConnection: e.SnapshotConnection,
}
}

func (l *sysEventListener) handleNodeHostShuttingDown(e server.SystemEvent) {
if l.userListener != nil {
l.userListener.NodeHostShuttingDown()
}
}

func (l *sysEventListener) handleNodeReady(e server.SystemEvent) {
if l.userListener != nil {
l.userListener.NodeReady(getNodeInfo(e))
}
}

func (l *sysEventListener) handleNodeUnloaded(e server.SystemEvent) {
if l.userListener != nil {
l.userListener.NodeUnloaded(getNodeInfo(e))
}
}

func (l *sysEventListener) handleMembershipChanged(e server.SystemEvent) {
if l.userListener != nil {
l.userListener.MembershipChanged(getNodeInfo(e))
}
}

func (l *sysEventListener) handleConnectionEstablished(e server.SystemEvent) {
if l.userListener != nil {
l.userListener.ConnectionEstablished(getConnectionInfo(e))
}
}

func (l *sysEventListener) handleConnectionFailed(e server.SystemEvent) {
if l.userListener != nil {
l.userListener.ConnectionFailed(getConnectionInfo(e))
}
}

func (l *sysEventListener) handleSendSnapshotStarted(e server.SystemEvent) {
if l.userListener != nil {
l.userListener.SendSnapshotStarted(getSnapshotInfo(e))
}
}

func (l *sysEventListener) handleSendSnapshotCompleted(e server.SystemEvent) {
if l.userListener != nil {
l.userListener.SendSnapshotCompleted(getSnapshotInfo(e))
}
}

func (l *sysEventListener) handleSendSnapshotAborted(e server.SystemEvent) {
if l.userListener != nil {
l.userListener.SendSnapshotAborted(getSnapshotInfo(e))
}
}

func (l *sysEventListener) handleSnapshotReceived(e server.SystemEvent) {
if l.userListener != nil {
l.userListener.SnapshotReceived(getSnapshotInfo(e))
}
}

func (l *sysEventListener) handleSnapshotRecovered(e server.SystemEvent) {
if l.userListener != nil {
l.userListener.SnapshotRecovered(getSnapshotInfo(e))
}
}

func (l *sysEventListener) handleSnapshotCreated(e server.SystemEvent) {
if l.userListener != nil {
l.userListener.SnapshotCreated(getSnapshotInfo(e))
}
}

func (l *sysEventListener) handleSnapshotCompacted(e server.SystemEvent) {
if l.userListener != nil {
l.userListener.SnapshotCompacted(getSnapshotInfo(e))
}
}

func (l *sysEventListener) handleLogCompacted(e server.SystemEvent) {
if l.userListener != nil {
l.userListener.LogCompacted(getEntryInfo(e))
}
}

func (l *sysEventListener) handleLogDBCompacted(e server.SystemEvent) {
if l.userListener != nil {
l.userListener.LogDBCompacted(getEntryInfo(e))
}
}
74 changes: 36 additions & 38 deletions nodehost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,26 @@ func getTestConfig() *config.Config {
}
}

func waitNodeInfoEvent(t *testing.T, f func() []raftio.NodeInfo, count int) {
for i := 0; i < 1000; i++ {
if len(f()) == count {
return
}
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("failed to get node info event")
}

func waitSnapshotInfoEvent(t *testing.T, f func() []raftio.SnapshotInfo, count int) {
for i := 0; i < 1000; i++ {
if len(f()) == count {
return
}
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("failed to get snapshot info event")
}

type testSysEventListener struct {
mu sync.Mutex
nodeHostShuttingdown uint64
Expand Down Expand Up @@ -1541,17 +1561,14 @@ func TestNodeHostSyncIOAPIs(t *testing.T) {
if err := nh.StopCluster(1); err != nil {
t.Errorf("failed to stop cluster 2 %v", err)
}
listener, ok := nh.events.sys.userListener.(*testSysEventListener)
listener, ok := nh.events.sys.ul.(*testSysEventListener)
if !ok {
t.Fatalf("failed to get the system event listener")
}
if len(listener.getNodeReady()) != 1 {
t.Errorf("node ready not signalled")
} else {
ni := listener.getNodeReady()[0]
if ni.ClusterID != 1 || ni.NodeID != 1 {
t.Fatalf("incorrect node ready info")
}
waitNodeInfoEvent(t, listener.getNodeReady, 1)
ni := listener.getNodeReady()[0]
if ni.ClusterID != 1 || ni.NodeID != 1 {
t.Fatalf("incorrect node ready info")
}
},
}
Expand Down Expand Up @@ -1611,19 +1628,11 @@ func TestSyncRequestDeleteNode(t *testing.T) {
if err != nil {
t.Errorf("failed to delete node %v", err)
}
listener, ok := nh.events.sys.userListener.(*testSysEventListener)
listener, ok := nh.events.sys.ul.(*testSysEventListener)
if !ok {
t.Fatalf("failed to get the system event listener")
}
retry := 0
for retry < 10000 {
if len(listener.getMembershipChanged()) != 2 {
time.Sleep(time.Millisecond)
retry++
} else {
break
}
}
waitNodeInfoEvent(t, listener.getMembershipChanged, 2)
ni := listener.getMembershipChanged()[1]
if ni.ClusterID != 1 || ni.NodeID != 1 {
t.Fatalf("incorrect node ready info")
Expand Down Expand Up @@ -2120,7 +2129,7 @@ func TestOnDiskSMCanStreamSnapshot(t *testing.T) {
if !snapshotted {
t.Fatalf("failed to take 3 snapshots")
}
listener, ok := nh2.events.sys.userListener.(*testSysEventListener)
listener, ok := nh2.events.sys.ul.(*testSysEventListener)
if !ok {
t.Fatalf("failed to get the system event listener")
}
Expand All @@ -2136,7 +2145,7 @@ func TestOnDiskSMCanStreamSnapshot(t *testing.T) {
if len(listener.getLogCompacted()) == 0 {
t.Fatalf("log compaction not notified")
}
listener, ok = nh1.events.sys.userListener.(*testSysEventListener)
listener, ok = nh1.events.sys.ul.(*testSysEventListener)
if !ok {
t.Fatalf("failed to get the system event listener")
}
Expand Down Expand Up @@ -2660,19 +2669,11 @@ func TestSyncRequestSnapshot(t *testing.T) {
if idx == 0 {
t.Errorf("unexpected index %d", idx)
}
listener, ok := nh.events.sys.userListener.(*testSysEventListener)
listener, ok := nh.events.sys.ul.(*testSysEventListener)
if !ok {
t.Fatalf("failed to get the system event listener")
}
retry := 0
for retry < 10000 {
if len(listener.getSnapshotCreated()) != 1 {
time.Sleep(time.Millisecond)
retry++
} else {
break
}
}
waitSnapshotInfoEvent(t, listener.getSnapshotCreated, 1)
si := listener.getSnapshotCreated()[0]
if si.ClusterID != 1 || si.NodeID != 1 {
t.Fatalf("incorrect created snapshot info")
Expand Down Expand Up @@ -2957,17 +2958,14 @@ func TestSyncRemoveData(t *testing.T) {
if err := nh.SyncRemoveData(ctx, 1, 1); err != nil {
t.Fatalf("sync remove data failed: %v", err)
}
listener, ok := nh.events.sys.userListener.(*testSysEventListener)
listener, ok := nh.events.sys.ul.(*testSysEventListener)
if !ok {
t.Fatalf("failed to get the system event listener")
}
if len(listener.getNodeUnloaded()) != 1 {
t.Errorf("node ready not signalled")
} else {
ni := listener.getNodeUnloaded()[0]
if ni.ClusterID != 1 || ni.NodeID != 1 {
t.Fatalf("incorrect node unloaded info")
}
waitNodeInfoEvent(t, listener.getNodeUnloaded, 1)
ni := listener.getNodeUnloaded()[0]
if ni.ClusterID != 1 || ni.NodeID != 1 {
t.Fatalf("incorrect node unloaded info")
}
},
}
Expand Down

0 comments on commit 48822cf

Please sign in to comment.