Skip to content

Commit

Permalink
scheduler concurrent bug fix (dragonflyoss#513)
Browse files Browse the repository at this point in the history
Signed-off-by: santong <[email protected]>
  • Loading branch information
244372610 authored Aug 2, 2021
1 parent f98401d commit 59a40f9
Show file tree
Hide file tree
Showing 13 changed files with 96 additions and 100 deletions.
8 changes: 7 additions & 1 deletion client/config/peerhost_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package config
import (
"net"

"d7y.io/dragonfly/v2/pkg/basic/dfnet"
"golang.org/x/time/rate"

"d7y.io/dragonfly/v2/client/clientutil"
Expand All @@ -42,7 +43,12 @@ var peerHostConfig = DaemonOption{
GCInterval: clientutil.Duration{Duration: DefaultGCInterval},
KeepStorage: false,
Scheduler: SchedulerOption{
NetAddrs: nil,
NetAddrs: []dfnet.NetAddr{
{
Type: dfnet.TCP,
Addr: "127.0.0.1:8002",
},
},
ScheduleTimeout: clientutil.Duration{Duration: DefaultScheduleTimeout},
},
Host: HostOption{
Expand Down
4 changes: 2 additions & 2 deletions client/daemon/peer/peertask_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer
if getErr != nil {
span.RecordError(getErr)
// fast way to exit retry
if curPeerPacket != pt.peerPacket {
if curPeerPacket.MainPeer.PeerId != pt.peerPacket.MainPeer.PeerId {
pt.Warnf("get piece tasks with error: %s, but peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", getErr,
curPeerPacket.MainPeer.PeerId, pt.peerPacket.MainPeer.PeerId)
peerPacketChanged = true
Expand All @@ -709,7 +709,7 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer
pt.Errorf("send piece result error: %s, code: %d", peer.PeerId, er)
}
// fast way to exit retry
if curPeerPacket != pt.peerPacket {
if curPeerPacket.MainPeer.PeerId != pt.peerPacket.MainPeer.PeerId {
pt.Warnf("get empty pieces and peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s",
curPeerPacket.MainPeer.PeerId, pt.peerPacket.MainPeer.PeerId)
peerPacketChanged = true
Expand Down
2 changes: 1 addition & 1 deletion cmd/dependency/dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func InitMonitor(verbose bool, pprofPort int, jaeger string) func() {
pprofPort, _ = freeport.GetFreePort()
}

debugAddr := fmt.Sprintf("localhost:%d", pprofPort)
debugAddr := fmt.Sprintf("%s:%d", iputils.HostIP, pprofPort)
viewer.SetConfiguration(viewer.WithAddr(debugAddr))

logger.With("pprof", fmt.Sprintf("http://%s/debug/pprof", debugAddr),
Expand Down
7 changes: 5 additions & 2 deletions pkg/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,10 @@ func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...st
}
if selected {
if client, ok := conn.node2ClientMap.Load(node); ok {
return client.(*candidateClient), nil
return &candidateClient{
node: candidateNode,
Ref: client,
}, nil
}
}
}
Expand All @@ -226,7 +229,7 @@ func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...st
if len(ringNodes) == 0 {
return nil, dferrors.ErrNoCandidateNode
}
candidateNodes := make([]string, 0, 0)
candidateNodes := make([]string, 0)
for _, ringNode := range ringNodes {
candidate := true
for _, exclusiveNode := range exclusiveNodes {
Expand Down
15 changes: 7 additions & 8 deletions pkg/rpc/scheduler/client/peer_packet_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package client
import (
"context"
"io"
"time"

"github.com/pkg/errors"
"google.golang.org/grpc"
Expand Down Expand Up @@ -56,7 +55,7 @@ func newPeerPacketStream(ctx context.Context, sc *schedulerClient, hashKey strin

pps := &peerPacketStream{
sc: sc,
ctx: ctx,
ctx: context.Background(),
hashKey: hashKey,
ptr: ptr,
opts: opts,
Expand Down Expand Up @@ -126,9 +125,9 @@ func (pps *peerPacketStream) retryRecv(cause error) (*scheduler.PeerPacket, erro
if err != nil {
return nil, err
}
timeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = client.RegisterPeerTask(timeCtx, pps.ptr)
//timeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
//defer cancel()
_, err = client.RegisterPeerTask(pps.ctx, pps.ptr)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -198,9 +197,9 @@ func (pps *peerPacketStream) replaceClient(cause error) error {
if err != nil {
return nil, err
}
timeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = client.RegisterPeerTask(timeCtx, pps.ptr)
//timeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
//defer cancel()
_, err = client.RegisterPeerTask(pps.ctx, pps.ptr)
if err != nil {
return nil, err
}
Expand Down
77 changes: 17 additions & 60 deletions scheduler/core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,7 @@ func (s *state) start() {
s.waitScheduleParentPeerQueue.AddAfter(peer, time.Second)
continue
}
if peer.PacketChan == nil {
logger.Errorf("waitScheduleParentPeerQueue: there is no packet chan associated with peer %s", peer.PeerID)
return
}
peer.PacketChan <- constructSuccessPeerPacket(peer, parent, candidates)
peer.SendSchedulePacket(constructSuccessPeerPacket(peer, parent, candidates))
logger.WithTaskAndPeerID(peer.Task.TaskID,
peer.PeerID).Debugf("waitScheduleParentPeerQueue: peer has left from waitScheduleParentPeerQueue because it has scheduled new parent %v", parent)
s.waitScheduleParentPeerQueue.Done(v)
Expand All @@ -104,15 +100,11 @@ func (e startReportPieceResultEvent) apply(s *state) {
s.waitScheduleParentPeerQueue.AddAfter(e.peer, time.Second)
return
}
if e.peer.PacketChan == nil {
logger.Errorf("start report piece result: there is no packet chan associated with peer %s", e.peer.PeerID)
return
}
e.peer.PacketChan <- constructSuccessPeerPacket(e.peer, parent, candidates)
e.peer.SendSchedulePacket(constructSuccessPeerPacket(e.peer, parent, candidates))
}

func (e startReportPieceResultEvent) hashKey() string {
return e.peer.PeerID
return e.peer.Task.TaskID
}

type peerDownloadPieceSuccessEvent struct {
Expand Down Expand Up @@ -140,17 +132,13 @@ func (e peerDownloadPieceSuccessEvent) apply(s *state) {
if oldParent != nil {
candidates = append(candidates, oldParent)
}
if e.peer.PacketChan == nil {
logger.Errorf("peerDownloadPieceSuccessEvent: there is no packet chan with peer %s", e.peer.PeerID)
return
}
// TODO if parentPeer is equal with oldParent, need schedule again ?
e.peer.PacketChan <- constructSuccessPeerPacket(e.peer, parentPeer, candidates)
e.peer.SendSchedulePacket(constructSuccessPeerPacket(e.peer, parentPeer, candidates))
return
}

func (e peerDownloadPieceSuccessEvent) hashKey() string {
return e.peer.PeerID
return e.peer.Task.TaskID
}

type peerDownloadPieceFailEvent struct {
Expand Down Expand Up @@ -183,15 +171,15 @@ func (e peerDownloadPieceFailEvent) apply(s *state) {
}
}
func (e peerDownloadPieceFailEvent) hashKey() string {
return e.peer.PeerID
return e.peer.Task.TaskID
}

type peerReplaceParentEvent struct {
peer *types.Peer
}

func (e peerReplaceParentEvent) hashKey() string {
return e.peer.PeerID
return e.peer.Task.TaskID
}

func (e peerReplaceParentEvent) apply(s *state) {
Expand Down Expand Up @@ -226,20 +214,13 @@ func (e peerDownloadSuccessEvent) apply(s *state) {
removePeerFromCurrentTree(e.peer, s)
children := s.sched.ScheduleChildren(e.peer)
for _, child := range children {
if child.PacketChan == nil {
logger.Debugf("reportPeerSuccessResult: there is no packet chan with peer %s", e.peer.PeerID)
continue
}
child.PacketChan <- constructSuccessPeerPacket(child, e.peer, nil)
}
if e.peer.PacketChan != nil {
close(e.peer.PacketChan)
e.peer.PacketChan = nil
child.SendSchedulePacket(constructSuccessPeerPacket(child, e.peer, nil))
}
e.peer.UnBindSendChannel()
}

func (e peerDownloadSuccessEvent) hashKey() string {
return e.peer.PeerID
return e.peer.Task.TaskID
}

type peerDownloadFailEvent struct {
Expand All @@ -260,22 +241,14 @@ func (e peerDownloadFailEvent) apply(s *state) {
s.waitScheduleParentPeerQueue.AddAfter(e.peer, time.Second)
return true
}
if child.PacketChan == nil {
logger.Warnf("reportPeerFailResult: there is no packet chan associated with peer %s", e.peer.PeerID)
return true
}
child.PacketChan <- constructSuccessPeerPacket(child, parent, candidates)
child.SendSchedulePacket(constructSuccessPeerPacket(child, parent, candidates))
return true
})
if e.peer.PacketChan != nil {
close(e.peer.PacketChan)
e.peer.PacketChan = nil
}
s.peerManager.Delete(e.peer.PeerID)
}

func (e peerDownloadFailEvent) hashKey() string {
return e.peer.PeerID
return e.peer.Task.TaskID
}

type peerLeaveEvent struct {
Expand All @@ -289,7 +262,7 @@ func (e peerLeaveEvent) apply(s *state) {
}

func (e peerLeaveEvent) hashKey() string {
return e.peer.PeerID
return e.peer.Task.TaskID
}

func constructSuccessPeerPacket(peer *types.Peer, parent *types.Peer, candidates []*types.Peer) *schedulerRPC.PeerPacket {
Expand Down Expand Up @@ -337,11 +310,7 @@ func handlePeerLeave(peer *types.Peer, s *state) {
s.waitScheduleParentPeerQueue.AddAfter(child, time.Second)
return true
}
if child.PacketChan == nil {
logger.Debugf("handlePeerLeave: there is no packet chan with peer %s", child.PeerID)
return true
}
child.PacketChan <- constructSuccessPeerPacket(child, parent, candidates)
child.SendSchedulePacket(constructSuccessPeerPacket(child, parent, candidates))
return true
})
s.peerManager.Delete(peer.PeerID)
Expand All @@ -355,22 +324,14 @@ func handleReplaceParent(peer *types.Peer, s *state) {
s.waitScheduleParentPeerQueue.AddAfter(peer, time.Second)
return
}
if peer.PacketChan == nil {
logger.Errorf("handleReplaceParent: there is no packet chan with peer %s", peer.PeerID)
return
}
peer.PacketChan <- constructSuccessPeerPacket(peer, parent, candidates)
peer.SendSchedulePacket(constructSuccessPeerPacket(peer, parent, candidates))
}

func handleSeedTaskFail(task *types.Task) {
if task.IsFail() {
task.ListPeers().Range(func(data sortedlist.Item) bool {
peer := data.(*types.Peer)
if peer.PacketChan == nil {
logger.Debugf("taskSeedFailEvent: there is no packet chan with peer %s", peer.PeerID)
return true
}
peer.PacketChan <- constructFailPeerPacket(peer, dfcodes.CdnError)
peer.SendSchedulePacket(constructFailPeerPacket(peer, dfcodes.CdnError))
return true
})
}
Expand All @@ -383,11 +344,7 @@ func removePeerFromCurrentTree(peer *types.Peer, s *state) {
if parent != nil {
children := s.sched.ScheduleChildren(parent)
for _, child := range children {
if child.PacketChan == nil {
logger.Debugf("removePeerFromCurrentTree: there is no packet chan with peer %s", peer.PeerID)
continue
}
child.PacketChan <- constructSuccessPeerPacket(child, peer, nil)
child.SendSchedulePacket(constructSuccessPeerPacket(child, peer, nil))
}
}
}
2 changes: 1 addition & 1 deletion scheduler/core/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (m *monitor) printDebugInfo() string {
parentNode = peer.GetParent().PeerID
}

table.Append([]string{peer.PeerID, peer.Task.URL[len(peer.Task.URL)-15 : len(peer.Task.URL)-5], parentNode, peer.GetStatus().String(),
table.Append([]string{peer.PeerID, peer.Task.URL[len(peer.Task.URL)-15 : len(peer.Task.URL)], parentNode, peer.GetStatus().String(),
peer.CreateTime.String(),
strconv.Itoa(int(peer.GetFinishedNum())),
strconv.FormatBool(peer.IsSuccess()), strconv.Itoa(peer.Host.GetFreeUploadLoad())})
Expand Down
3 changes: 3 additions & 0 deletions scheduler/core/scheduler_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ func (s *SchedulerService) GetOrCreateTask(ctx context.Context, task *types.Task
if task.IsFrozen() {
task.SetStatus(types.TaskStatusRunning)
}
//if s.config.DisableCDN {
// TODO NeedBackSource
//}
go func() {
if err := s.cdnManager.StartSeedTask(ctx, task); err != nil {
if !task.IsSuccess() {
Expand Down
2 changes: 1 addition & 1 deletion scheduler/core/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (wg *workerGroup) send(e event) bool {

func (wg *workerGroup) stop() {
close(wg.stopCh)
wg.s.start()
wg.s.stop()
for _, worker := range wg.workerList {
worker.stop()
}
Expand Down
7 changes: 4 additions & 3 deletions scheduler/daemon/cdn/d7y/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (cm *manager) StartSeedTask(ctx context.Context, task *types.Task) error {
}

func (cm *manager) receivePiece(task *types.Task, stream *client.PieceSeedStream) error {
var once sync.Once
var initialized bool
var cdnPeer *types.Peer
for {
piece, err := stream.Recv()
Expand All @@ -138,9 +138,10 @@ func (cm *manager) receivePiece(task *types.Task, stream *client.PieceSeedStream
return errors.Wrapf(ErrCDNInvokeFail, "receive piece from cdn: %v", err)
}
if piece != nil {
once.Do(func() {
if !initialized {
cdnPeer, err = cm.initCdnPeer(task, piece)
})
initialized = true
}
if err != nil || cdnPeer == nil {
return err
}
Expand Down
6 changes: 1 addition & 5 deletions scheduler/daemon/peer/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,7 @@ func (m *manager) Delete(peerID string) {
if ok {
peer.Host.DeletePeer(peerID)
peer.Task.DeletePeer(peer)
if peer.PacketChan != nil {
close(peer.PacketChan)
logger.Infof("close peer %s stream", peerID)
peer.PacketChan = nil
}
peer.UnBindSendChannel()
m.peerMap.Delete(peerID)
}
return
Expand Down
Loading

0 comments on commit 59a40f9

Please sign in to comment.