Skip to content

Commit

Permalink
feat: set peer state to running when scope size is SizeScope_TINY (dr…
Browse files Browse the repository at this point in the history
…agonflyoss#1004)

* chore: sync docker-compose scheduler config (dragonflyoss#1001)

Signed-off-by: Jim Ma <[email protected]>
Signed-off-by: Gaius <[email protected]>

* feat: set peer state to running when scope size is SizeScope_TINY

Signed-off-by: Gaius <[email protected]>

Co-authored-by: Jim Ma <[email protected]>
  • Loading branch information
gaius-qi and jim3ma authored Jan 18, 2022
1 parent 060d892 commit 8aecb35
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 35 deletions.
6 changes: 3 additions & 3 deletions scheduler/resource/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ const (
)

const (
// Peer is downloading
PeerEventDownload = "Download"

// Peer is registered as tiny scope size
PeerEventRegisterTiny = "RegisterTiny"

Expand All @@ -75,6 +72,9 @@ const (
// Peer is registered as normal scope size
PeerEventRegisterNormal = "RegisterNormal"

// Peer is downloading
PeerEventDownload = "Download"

// Peer is downloading from back-to-source
PeerEventDownloadFromBackToSource = "DownloadFromBackToSource"

Expand Down
19 changes: 19 additions & 0 deletions scheduler/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ func (s *Server) RegisterPeerTask(ctx context.Context, req *scheduler.PeerTaskRe
return nil, dferr
}

// Dfdaemon does not report piece info when scope size is SizeScope_TINY
if err := peer.FSM.Event(resource.PeerEventDownload); err != nil {
dferr := dferrors.New(base.Code_SchedError, err.Error())
peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err)
return nil, dferr
}

return &scheduler.RegisterResult{
TaskId: task.ID,
SizeScope: sizeScope,
Expand All @@ -96,6 +103,12 @@ func (s *Server) RegisterPeerTask(ctx context.Context, req *scheduler.PeerTaskRe
parent, ok := s.service.Scheduler().FindParent(ctx, peer, set.NewSafeSet())
if !ok {
peer.Log.Warn("task size scope is small and it can not select parent")
if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil {
dferr := dferrors.New(base.Code_SchedError, err.Error())
peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err)
return nil, dferr
}

return &scheduler.RegisterResult{
TaskId: task.ID,
SizeScope: sizeScope,
Expand All @@ -105,6 +118,12 @@ func (s *Server) RegisterPeerTask(ctx context.Context, req *scheduler.PeerTaskRe
firstPiece, ok := task.LoadPiece(0)
if !ok {
peer.Log.Warn("task size scope is small and it can not get first piece")
if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil {
dferr := dferrors.New(base.Code_SchedError, err.Error())
peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err)
return nil, dferr
}

return &scheduler.RegisterResult{
TaskId: task.ID,
SizeScope: sizeScope,
Expand Down
35 changes: 21 additions & 14 deletions scheduler/rpcserver/rpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ func TestRPCServer_RegisterPeerTask(t *testing.T) {
name string
req *rpcscheduler.PeerTaskRequest
mock func(req *rpcscheduler.PeerTaskRequest, mockPeer *resource.Peer, mockHost *resource.Host, mockTask *resource.Task, scheduler scheduler.Scheduler, ms *mocks.MockServiceMockRecorder, msched *schedulermocks.MockSchedulerMockRecorder)
expect func(t *testing.T, result *rpcscheduler.RegisterResult, err error)
expect func(t *testing.T, peer *resource.Peer, result *rpcscheduler.RegisterResult, err error)
}{
{
name: "service register failed",
req: &rpcscheduler.PeerTaskRequest{},
mock: func(req *rpcscheduler.PeerTaskRequest, mockPeer *resource.Peer, mockHost *resource.Host, mockTask *resource.Task, scheduler scheduler.Scheduler, ms *mocks.MockServiceMockRecorder, msched *schedulermocks.MockSchedulerMockRecorder) {
ms.RegisterTask(context.Background(), req).Return(nil, errors.New("foo"))
},
expect: func(t *testing.T, result *rpcscheduler.RegisterResult, err error) {
expect: func(t *testing.T, peer *resource.Peer, result *rpcscheduler.RegisterResult, err error) {
assert := assert.New(t)
dferr, ok := err.(*dferrors.DfError)
assert.True(ok)
Expand All @@ -122,7 +122,7 @@ func TestRPCServer_RegisterPeerTask(t *testing.T) {
ms.LoadOrStorePeer(context.Background(), req, gomock.Any(), gomock.Any()).Return(mockPeer, true).Times(1),
)
},
expect: func(t *testing.T, result *rpcscheduler.RegisterResult, err error) {
expect: func(t *testing.T, peer *resource.Peer, result *rpcscheduler.RegisterResult, err error) {
assert := assert.New(t)
assert.Equal(result.TaskId, mockTaskID)
assert.Equal(result.SizeScope, base.SizeScope_NORMAL)
Expand All @@ -141,7 +141,7 @@ func TestRPCServer_RegisterPeerTask(t *testing.T) {
ms.LoadOrStorePeer(context.Background(), req, gomock.Any(), gomock.Any()).Return(mockPeer, true).Times(1),
)
},
expect: func(t *testing.T, result *rpcscheduler.RegisterResult, err error) {
expect: func(t *testing.T, peer *resource.Peer, result *rpcscheduler.RegisterResult, err error) {
assert := assert.New(t)
dferr, ok := err.(*dferrors.DfError)
assert.True(ok)
Expand All @@ -160,7 +160,7 @@ func TestRPCServer_RegisterPeerTask(t *testing.T) {
ms.LoadOrStorePeer(context.Background(), req, gomock.Any(), gomock.Any()).Return(mockPeer, true).Times(1),
)
},
expect: func(t *testing.T, result *rpcscheduler.RegisterResult, err error) {
expect: func(t *testing.T, peer *resource.Peer, result *rpcscheduler.RegisterResult, err error) {
assert := assert.New(t)
assert.Equal(result.TaskId, mockTaskID)
assert.Equal(result.SizeScope, base.SizeScope_NORMAL)
Expand All @@ -179,7 +179,7 @@ func TestRPCServer_RegisterPeerTask(t *testing.T) {
ms.LoadOrStorePeer(context.Background(), req, gomock.Any(), gomock.Any()).Return(mockPeer, true).Times(1),
)
},
expect: func(t *testing.T, result *rpcscheduler.RegisterResult, err error) {
expect: func(t *testing.T, peer *resource.Peer, result *rpcscheduler.RegisterResult, err error) {
assert := assert.New(t)
dferr, ok := err.(*dferrors.DfError)
assert.True(ok)
Expand All @@ -199,13 +199,14 @@ func TestRPCServer_RegisterPeerTask(t *testing.T) {
ms.LoadOrStorePeer(context.Background(), req, gomock.Any(), gomock.Any()).Return(mockPeer, true).Times(1),
)
},
expect: func(t *testing.T, result *rpcscheduler.RegisterResult, err error) {
expect: func(t *testing.T, peer *resource.Peer, result *rpcscheduler.RegisterResult, err error) {
assert := assert.New(t)
assert.Equal(result.TaskId, mockTaskID)
assert.Equal(result.SizeScope, base.SizeScope_TINY)
assert.Equal(result.DirectPiece, &rpcscheduler.RegisterResult_PieceContent{
PieceContent: []byte{1},
})
assert.True(peer.FSM.Is(resource.PeerStateRunning))
},
},
{
Expand All @@ -222,10 +223,11 @@ func TestRPCServer_RegisterPeerTask(t *testing.T) {
msched.FindParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, false).Times(1),
)
},
expect: func(t *testing.T, result *rpcscheduler.RegisterResult, err error) {
expect: func(t *testing.T, peer *resource.Peer, result *rpcscheduler.RegisterResult, err error) {
assert := assert.New(t)
assert.Equal(result.TaskId, mockTaskID)
assert.Equal(result.SizeScope, base.SizeScope_TINY)
assert.True(peer.FSM.Is(resource.PeerStateReceivedNormal))
},
},
{
Expand All @@ -243,10 +245,11 @@ func TestRPCServer_RegisterPeerTask(t *testing.T) {
msched.FindParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, false).Times(1),
)
},
expect: func(t *testing.T, result *rpcscheduler.RegisterResult, err error) {
expect: func(t *testing.T, peer *resource.Peer, result *rpcscheduler.RegisterResult, err error) {
assert := assert.New(t)
assert.Equal(result.TaskId, mockTaskID)
assert.Equal(result.SizeScope, base.SizeScope_SMALL)
assert.True(peer.FSM.Is(resource.PeerStateReceivedNormal))
},
},
{
Expand All @@ -265,10 +268,11 @@ func TestRPCServer_RegisterPeerTask(t *testing.T) {
msched.FindParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockPeer, true).Times(1),
)
},
expect: func(t *testing.T, result *rpcscheduler.RegisterResult, err error) {
expect: func(t *testing.T, peer *resource.Peer, result *rpcscheduler.RegisterResult, err error) {
assert := assert.New(t)
assert.Equal(result.TaskId, mockTaskID)
assert.Equal(result.SizeScope, base.SizeScope_SMALL)
assert.True(peer.FSM.Is(resource.PeerStateReceivedNormal))
},
},
{
Expand All @@ -290,7 +294,7 @@ func TestRPCServer_RegisterPeerTask(t *testing.T) {
msched.FindParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockPeer, true).Times(1),
)
},
expect: func(t *testing.T, result *rpcscheduler.RegisterResult, err error) {
expect: func(t *testing.T, peer *resource.Peer, result *rpcscheduler.RegisterResult, err error) {
assert := assert.New(t)
assert.Equal(result.TaskId, mockTaskID)
assert.Equal(result.SizeScope, base.SizeScope_SMALL)
Expand All @@ -303,6 +307,7 @@ func TestRPCServer_RegisterPeerTask(t *testing.T) {
},
},
})
assert.True(peer.FSM.Is(resource.PeerStateReceivedSmall))
},
},
{
Expand All @@ -319,10 +324,11 @@ func TestRPCServer_RegisterPeerTask(t *testing.T) {
ms.LoadOrStorePeer(context.Background(), req, gomock.Any(), gomock.Any()).Return(mockPeer, true).Times(1),
)
},
expect: func(t *testing.T, result *rpcscheduler.RegisterResult, err error) {
expect: func(t *testing.T, peer *resource.Peer, result *rpcscheduler.RegisterResult, err error) {
assert := assert.New(t)
assert.Equal(result.TaskId, mockTaskID)
assert.Equal(result.SizeScope, base.SizeScope_NORMAL)
assert.True(peer.FSM.Is(resource.PeerStateReceivedNormal))
},
},
{
Expand All @@ -340,11 +346,12 @@ func TestRPCServer_RegisterPeerTask(t *testing.T) {
ms.LoadOrStorePeer(context.Background(), req, gomock.Any(), gomock.Any()).Return(mockPeer, true).Times(1),
)
},
expect: func(t *testing.T, result *rpcscheduler.RegisterResult, err error) {
expect: func(t *testing.T, peer *resource.Peer, result *rpcscheduler.RegisterResult, err error) {
assert := assert.New(t)
dferr, ok := err.(*dferrors.DfError)
assert.True(ok)
assert.Equal(dferr.Code, base.Code_SchedError)
assert.True(peer.FSM.Is(resource.PeerStateFailed))
},
},
}
Expand All @@ -362,7 +369,7 @@ func TestRPCServer_RegisterPeerTask(t *testing.T) {
tc.mock(tc.req, mockPeer, mockHost, mockTask, scheduler, svc.EXPECT(), scheduler.EXPECT())
svr := New(svc)
result, err := svr.RegisterPeerTask(context.Background(), tc.req)
tc.expect(t, result, err)
tc.expect(t, mockPeer, result, err)
})
}
}
Expand Down
8 changes: 0 additions & 8 deletions scheduler/service/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,6 @@ func (c *callback) BeginOfPiece(ctx context.Context, peer *resource.Peer) {
// Back to the source download process, peer directly returns
peer.Log.Info("peer back to source")
return
case resource.PeerStateReceivedTiny:
// When the task is tiny,
// the peer data has already returned to the parent when registering
peer.Log.Info("file type is tiny, peer data has already returned to the parent when registering")
if err := peer.FSM.Event(resource.PeerEventDownload); err != nil {
peer.Log.Errorf("peer fsm event failed: %v", err)
return
}
case resource.PeerStateReceivedSmall:
// When the task is small,
// the peer has already returned to the parent when registering
Expand Down
10 changes: 0 additions & 10 deletions scheduler/service/callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,6 @@ func TestCallback_BeginOfPiece(t *testing.T) {
assert.True(peer.FSM.Is(resource.PeerStateBackToSource))
},
},
{
name: "peer state is PeerStateReceivedTiny",
mock: func(peer *resource.Peer, scheduler *mocks.MockSchedulerMockRecorder) {
peer.FSM.SetState(resource.PeerStateReceivedTiny)
},
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
assert.True(peer.FSM.Is(resource.PeerStateRunning))
},
},
{
name: "peer state is PeerStateReceivedSmall",
mock: func(peer *resource.Peer, scheduler *mocks.MockSchedulerMockRecorder) {
Expand Down

0 comments on commit 8aecb35

Please sign in to comment.