Skip to content

Commit

Permalink
feat: if cdn is deleted, clear cdn related information (dragonflyoss#967
Browse files Browse the repository at this point in the history
)

* feat: if cdn is deleted, clear cdn related information

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Jan 5, 2022
1 parent c9c3650 commit 40f486b
Show file tree
Hide file tree
Showing 14 changed files with 286 additions and 96 deletions.
13 changes: 5 additions & 8 deletions .github/workflows/compatibility-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ name: Compatibility E2E Test

on:
push:
branches: [ main, release-* ]
paths-ignore: [ '**.md', '**.png', '**.jpg', '**.svg', '**/docs/**' ]
branches: [main, release-*]
paths-ignore: ["**.md", "**.png", "**.jpg", "**.svg", "**/docs/**"]
pull_request:
branches: [ main, release-* ]
paths-ignore: [ '**.md', '**.png', '**.jpg', '**.svg', '**/docs/**' ]
branches: [main, release-*]
paths-ignore: ["**.md", "**.png", "**.jpg", "**.svg", "**/docs/**"]

env:
GO_VERSION: 1.17
Expand Down Expand Up @@ -89,7 +89,4 @@ jobs:
with:
name: ${{ matrix.module }}-compatibility-e2e-tests-logs
path: |
/tmp/artifact/manager/*.log
/tmp/artifact/daemon/*.log
/tmp/artifact/scheduler/*.log
/tmp/artifact/cdn/*.log
/tmp/artifact/**/*.log
9 changes: 3 additions & 6 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ name: E2E Test
on:
push:
branches: [main, release-*]
paths-ignore: [ '**.md', '**.png', '**.jpg', '**.svg', '**/docs/**' ]
paths-ignore: ["**.md", "**.png", "**.jpg", "**.svg", "**/docs/**"]
pull_request:
branches: [main, release-*]
paths-ignore: [ '**.md', '**.png', '**.jpg', '**.svg', '**/docs/**' ]
paths-ignore: ["**.md", "**.png", "**.jpg", "**.svg", "**/docs/**"]

env:
GO_VERSION: 1.17
Expand Down Expand Up @@ -78,7 +78,4 @@ jobs:
with:
name: e2e-tests-logs
path: |
/tmp/artifact/manager/*.log
/tmp/artifact/daemon/*.log
/tmp/artifact/scheduler/*.log
/tmp/artifact/cdn/*.log
/tmp/artifact/**/*.log
14 changes: 7 additions & 7 deletions pkg/idgen/host_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestHostID(t *testing.T) {
port: 8000,
expect: func(t *testing.T, d string) {
assert := assert.New(t)
assert.Len(d, 8)
assert.Equal(d, "foo-8000")
},
},
{
Expand All @@ -44,7 +44,7 @@ func TestHostID(t *testing.T) {
port: 8000,
expect: func(t *testing.T, d string) {
assert := assert.New(t)
assert.Len(d, 5)
assert.Equal(d, "-8000")
},
},
{
Expand All @@ -53,7 +53,7 @@ func TestHostID(t *testing.T) {
port: 0,
expect: func(t *testing.T, d string) {
assert := assert.New(t)
assert.Len(d, 5)
assert.Equal(d, "foo-0")
},
},
}
Expand All @@ -78,16 +78,16 @@ func TestCDNHostID(t *testing.T) {
port: 8000,
expect: func(t *testing.T, d string) {
assert := assert.New(t)
assert.Len(d, 12)
assert.Equal(d, "foo-8000_CDN")
},
},
{
name: "generate CDNHostID with empty string",
name: "generate CDNHostID with empty host",
hostname: "",
port: 8000,
expect: func(t *testing.T, d string) {
assert := assert.New(t)
assert.Len(d, 9)
assert.Equal(d, "-8000_CDN")
},
},
{
Expand All @@ -96,7 +96,7 @@ func TestCDNHostID(t *testing.T) {
port: 0,
expect: func(t *testing.T, d string) {
assert := assert.New(t)
assert.Len(d, 9)
assert.Equal(d, "foo-0_CDN")
},
},
}
Expand Down
3 changes: 2 additions & 1 deletion scheduler/core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ func (e peerDownloadPieceFailEvent) apply(s *state) {
switch e.pr.Code {
case base.Code_ClientWaitPieceReady:
return
case base.Code_PeerTaskNotFound:
// FIXME check dst peer healthy before delete the peer
case base.Code_ClientPieceRequestFail, base.Code_PeerTaskNotFound:
s.peerManager.Delete(e.pr.DstPid)
case base.Code_CDNTaskNotFound, base.Code_CDNError, base.Code_CDNTaskDownloadFail:
s.peerManager.Delete(e.pr.DstPid)
Expand Down
2 changes: 1 addition & 1 deletion scheduler/core/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func NewSchedulerService(cfg *config.SchedulerConfig, pluginDir string, metricsC
if ops.openTel {
opts = append(opts, grpc.WithChainUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), grpc.WithChainStreamInterceptor(otelgrpc.StreamClientInterceptor()))
}
client, err := supervisor.NewCDNDynmaicClient(dynConfig, opts)
client, err := supervisor.NewCDNDynmaicClient(dynConfig, peerManager, hostManager, opts)
if err != nil {
return nil, errors.Wrap(err, "new refreshable cdn client")
}
Expand Down
57 changes: 49 additions & 8 deletions scheduler/supervisor/cdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,14 @@ type CDNDynmaicClient interface {

type cdnDynmaicClient struct {
cdnclient.CdnClient
data *config.DynconfigData
hosts map[string]*Host
lock sync.RWMutex
data *config.DynconfigData
hosts map[string]*Host
lock sync.RWMutex
peerManager PeerManager
hostManager HostManager
}

func NewCDNDynmaicClient(dynConfig config.DynconfigInterface, opts []grpc.DialOption) (CDNDynmaicClient, error) {
func NewCDNDynmaicClient(dynConfig config.DynconfigInterface, peerManager PeerManager, hostManager HostManager, opts []grpc.DialOption) (CDNDynmaicClient, error) {
config, err := dynConfig.Get()
if err != nil {
return nil, err
Expand All @@ -270,9 +272,11 @@ func NewCDNDynmaicClient(dynConfig config.DynconfigInterface, opts []grpc.DialOp
}

dc := &cdnDynmaicClient{
CdnClient: client,
data: config,
hosts: cdnsToHosts(config.CDNs),
CdnClient: client,
data: config,
hosts: cdnsToHosts(config.CDNs),
peerManager: peerManager,
hostManager: hostManager,
}

dynConfig.Register(dc)
Expand All @@ -299,8 +303,45 @@ func (dc *cdnDynmaicClient) OnNotify(data *config.DynconfigData) {
dc.lock.Lock()
defer dc.lock.Unlock()

// If cdn is deleted, clear cdn related information
hosts := cdnsToHosts(data.CDNs)
logger.Infof("cdn hosts %#v update to %#v", dc.hosts, hosts)

for _, v := range dc.hosts {
id := idgen.CDNHostID(v.HostName, v.RPCPort)
for _, host := range hosts {
if v.HostName != host.HostName {
continue
}

if v.RPCPort != host.RPCPort {
continue
}

if v.IP == host.IP {
continue
}

v.Log().Info("host has been deleted")
if host, ok := dc.hostManager.Get(id); ok {
host.GetPeers().Range(func(_, value interface{}) bool {
if peer, ok := value.(*Peer); ok {
peer.Log().Info("cdn peer left because cdn host was deleted")
peer.Leave()
}

return true
})
v.Log().Info("delete cdn host from host manager because cdn host was deleted")
dc.hostManager.Delete(id)
} else {
v.Log().Warn("can not found host from host manager")
}
}
}

dc.data = data
dc.hosts = cdnsToHosts(data.CDNs)
dc.hosts = hosts
dc.UpdateState(cdnsToNetAddrs(data.CDNs))
}

Expand Down
4 changes: 4 additions & 0 deletions scheduler/supervisor/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ func (h *Host) GetPeer(id string) (*Peer, bool) {
return peer.(*Peer), ok
}

func (h *Host) GetPeers() *sync.Map {
return h.peers
}

func (h *Host) GetPeersLen() int {
length := 0
h.peers.Range(func(_, _ interface{}) bool {
Expand Down
76 changes: 76 additions & 0 deletions test/e2e/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package e2e

const (
proxy = "localhost:65001"
hostnameFilePath = "/etc/hostname"
dragonflyNamespace = "dragonfly-system"
dragonflyE2ENamespace = "dragonfly-e2e"
)

const (
dfdaemonCompatibilityTestMode = "dfdaemon"
)

const (
managerServerName = "manager"
schedulerServerName = "scheduler"
cdnServerName = "cdn"
dfdaemonServerName = "dfdaemon"
proxyServerName = "proxy"
)

type server struct {
name string
namespace string
logDirName string
replicas int
}

var servers = map[string]server{
managerServerName: {
name: managerServerName,
namespace: dragonflyNamespace,
logDirName: managerServerName,
replicas: 3,
},
schedulerServerName: {
name: schedulerServerName,
namespace: dragonflyNamespace,
logDirName: schedulerServerName,
replicas: 3,
},
cdnServerName: {
name: cdnServerName,
namespace: dragonflyNamespace,
logDirName: cdnServerName,
replicas: 3,
},
dfdaemonServerName: {
name: dfdaemonServerName,
namespace: dragonflyNamespace,
logDirName: "daemon",
replicas: 1,
},
proxyServerName: {
name: proxyServerName,
namespace: dragonflyE2ENamespace,
logDirName: "daemon",
replicas: 3,
},
}
23 changes: 0 additions & 23 deletions test/e2e/dfget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,29 +42,6 @@ var _ = Describe("Download with dfget and proxy", func() {
})
})

var _ = AfterSuite(func() {
// copy log files to artifact directory
for i := 0; i < 3; i++ {
out, err := e2eutil.KubeCtlCommand("-n", dragonflyE2ENamespace, "get", "pod", "-l", fmt.Sprintf("statefulset.kubernetes.io/pod-name=proxy-%d", i),
"-o", "jsonpath='{range .items[*]}{.metadata.name}{end}'").CombinedOutput()
if err != nil {
fmt.Printf("get pod error: %s\n", err)
continue
}
podName := strings.Trim(string(out), "'")
pod := e2eutil.NewPodExec(dragonflyE2ENamespace, podName, "proxy")

out, err = pod.Command("sh", "-c", fmt.Sprintf(`
set -x
cp /var/log/dragonfly/daemon/core.log /tmp/artifact/daemon/proxy-%d-core.log
cp /var/log/dragonfly/daemon/grpc.log /tmp/artifact/daemon/proxy-%d-grpc.log
`, i, i)).CombinedOutput()
if err != nil {
fmt.Printf("copy log output: %s, error: %s\n", string(out), err)
}
}
})

func singleDfgetTest(name, ns, label, podNamePrefix, container string) {
It(name, func() {
out, err := e2eutil.KubeCtlCommand("-n", ns, "get", "pod", "-l", label,
Expand Down
57 changes: 48 additions & 9 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package e2e
import (
"fmt"
"os"
"strconv"
"strings"
"testing"

Expand All @@ -29,16 +30,54 @@ import (
_ "d7y.io/dragonfly/v2/test/e2e/manager"
)

const (
proxy = "localhost:65001"
hostnameFilePath = "/etc/hostname"
dragonflyNamespace = "dragonfly-system"
dragonflyE2ENamespace = "dragonfly-e2e"
)
var _ = AfterSuite(func() {
for _, server := range servers {
for i := 0; i < server.replicas; i++ {
out, err := e2eutil.KubeCtlCommand("-n", server.namespace, "get", "pod", "-l", fmt.Sprintf("component=%s", server.name),
"-o", fmt.Sprintf("jsonpath='{.items[%d].metadata.name}'", i)).CombinedOutput()
if err != nil {
fmt.Printf("get pod error: %s\n", err)
continue
}
podName := strings.Trim(string(out), "'")
pod := e2eutil.NewPodExec(server.namespace, podName, server.name)

const (
dfdaemonCompatibilityTestMode = "dfdaemon"
)
countOut, err := e2eutil.KubeCtlCommand("-n", server.namespace, "get", "pod", "-l", fmt.Sprintf("component=%s", server.name),
"-o", fmt.Sprintf("jsonpath='{.items[%d].status.containerStatuses[0].restartCount}'", i)).CombinedOutput()
if err != nil {
fmt.Printf("get pod %s restart count error: %s\n", podName, err)
continue
}
rawCount := strings.Trim(string(countOut), "'")
count, err := strconv.Atoi(rawCount)
if err != nil {
fmt.Printf("atoi error: %s\n", err)
continue
}
fmt.Printf("pod %s restart count: %d\n", podName, count)

out, err = pod.Command("sh", "-c", fmt.Sprintf(`
set -x
cp -r /var/log/dragonfly/%s /tmp/artifact/%s-%d
find /tmp/artifact -type d -exec chmod 777 {} \;
`, server.logDirName, server.name, i)).CombinedOutput()
if err != nil {
fmt.Printf("copy log output: %s, error: %s\n", string(out), err)
}

if count > 0 {
if err := e2eutil.UploadArtifactPrevStdout(server.namespace, podName, fmt.Sprintf("%s-%d", server.name, i), server.name); err != nil {
fmt.Printf("upload pod %s artifact stdout file error: %v\n", podName, err)
}
}

if err := e2eutil.UploadArtifactStdout(server.namespace, podName, fmt.Sprintf("%s-%d", server.name, i), server.name); err != nil {
fmt.Printf("upload pod %s artifact prev stdout file error: %v\n", podName, err)
}

}
}
})

var _ = BeforeSuite(func() {
mode := os.Getenv("DRAGONFLY_COMPATIBILITY_E2E_TEST_MODE")
Expand Down
Loading

0 comments on commit 40f486b

Please sign in to comment.