Skip to content

Commit

Permalink
feat: define and implement new dfdaemon APIs to make dragonfly2 work …
Browse files Browse the repository at this point in the history
…as a distributed cache (dragonflyoss#1227)

Introduce a new dfcache command which uses dfcache SDK interface to
interactive with dfdaemon and operate on P2P cache system.

For example:
- add a file into cache system
  dfcache import --cid sha256:xxxxxx --tag testtag /path/to/file

- check if a file exists in cache system
  dfcache stat --cid testid --local  # only check local cache
  dfcache stat --cid testid          # check other peers as well

- export/download a file from cache system
  dfcache export --cid testid -O /path/to/output

- delete a file from cache system, both local cache and P2P network
  dfcache delete -i testid -t testtag

Signed-off-by: Gaius <[email protected]>
Signed-off-by: Eryu Guan <[email protected]>

Co-authored-by: Jim Ma <[email protected]>
Co-authored-by: Gaius <[email protected]>
  • Loading branch information
3 people authored Apr 14, 2022
1 parent 0d10ee4 commit a666680
Show file tree
Hide file tree
Showing 53 changed files with 4,305 additions and 237 deletions.
16 changes: 15 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ docker-push-manager: docker-build-manager
.PHONY: docker-push-manager

# Build dragonfly
build: build-cdn build-scheduler build-dfget build-manager
build: build-cdn build-scheduler build-dfget build-dfcache build-manager
.PHONY: build

# Build cdn
Expand All @@ -115,6 +115,18 @@ build-linux-dfget: build-dirs
GOOS=linux GOARCH=amd64 ./hack/build.sh dfget
.PHONY: build-linux-dfget

# Build dfcache
build-dfcache: build-dirs
@echo "Begin to build dfcache."
./hack/build.sh dfcache
.PHONY: build-dfcache

# Build linux dfcache
build-linux-dfcache: build-dirs
@echo "Begin to build linux dfcache."
GOOS=linux GOARCH=amd64 ./hack/build.sh dfcache
.PHONY: build-linux-dfcache

# Build scheduler
build-scheduler: build-dirs
@echo "Begin to build scheduler."
Expand Down Expand Up @@ -322,6 +334,8 @@ help:
@echo "make build-cdn build CDN"
@echo "make build-dfget build dfget"
@echo "make build-dfget-linux build linux dfget"
@echo "make build-dfcache build dfcache"
@echo "make build-dfcache-linux build linux dfcache"
@echo "make build-scheduler build scheduler"
@echo "make build-manager build manager"
@echo "make build-manager-console build manager console"
Expand Down
8 changes: 8 additions & 0 deletions client/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,11 @@ const (
SimpleLocalTaskStoreStrategy = StoreStrategy("io.d7y.storage.v2.simple")
AdvanceLocalTaskStoreStrategy = StoreStrategy("io.d7y.storage.v2.advance")
)

/* dfcache subcommand names */
const (
CmdStat = "stat"
CmdImport = "import"
CmdExport = "export"
CmdDelete = "delete"
)
236 changes: 236 additions & 0 deletions client/config/dfcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
/*
* Copyright 2022 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package config

import (
"encoding/json"
"fmt"
"os"
"path"
"path/filepath"
"syscall"
"time"

"github.com/pkg/errors"
"golang.org/x/time/rate"

"d7y.io/dragonfly/v2/cmd/dependency/base"
"d7y.io/dragonfly/v2/internal/dferrors"
"d7y.io/dragonfly/v2/pkg/basic"
"d7y.io/dragonfly/v2/pkg/util/stringutils"
)

type DfcacheConfig = CacheOption

// CacheOption holds all the runtime config information.
type CacheOption struct {
base.Options `yaml:",inline" mapstructure:",squash"`

// Cid content/cache ID
Cid string `yaml:"cid,omitempty" mapstructure:"cid,omitempty"`

// Tag identify task
Tag string `yaml:"tag,omitempty" mapstructure:"tag,omitempty"`

// Timeout operation timeout(second).
Timeout time.Duration `yaml:"timeout,omitempty" mapstructure:"timeout,omitempty"`

// CallSystem optional system name
CallSystem string `yaml:"callSystem,omitempty" mapstructure:"callSystem,omitempty"`

// LogDir is log directory of dfcache.
LogDir string `yaml:"logDir,omitempty" mapstructure:"logDir,omitempty"`

// WorkHome is working directory of dfcache.
WorkHome string `yaml:"workHome,omitempty" mapstructure:"workHome,omitempty"`

// Output full output path for export task
Output string `yaml:"output,omitempty" mapstructure:"output,omitempty"`

// Path full input path for import task
// TODO: change to Input
Path string `yaml:"path,omitempty" mapstructure:"path,omitempty"`

// RateLimit limits export task
RateLimit rate.Limit `yaml:"rateLimit,omitempty" mapstructure:"rateLimit,omitempty"`

// LocalOnly indicates check local cache only
LocalOnly bool `yaml:"localOnly,omitempty" mapstructure:"localOnly,omitempty"`
}

func NewDfcacheConfig() *CacheOption {
return &CacheOption{}
}

func validateCacheStat(cfg *CacheOption) error {
return nil
}

func validateCacheImport(cfg *CacheOption) error {
if err := cfg.checkInput(); err != nil {
return errors.Wrapf(dferrors.ErrInvalidArgument, "input path: %v", err)
}
return nil
}

func ValidateCacheExport(cfg *CacheOption) error {
if err := cfg.checkOutput(); err != nil {
return errors.Wrapf(dferrors.ErrInvalidArgument, "output: %v", err)
}
return nil
}

func ValidateCacheDelete(cfg *CacheOption) error {
return nil
}

func (cfg *CacheOption) Validate(cmd string) error {
// Some common validations
if cfg == nil {
return errors.Wrap(dferrors.ErrInvalidArgument, "runtime config")
}
if cfg.Cid == "" {
return errors.Wrap(dferrors.ErrInvalidArgument, "missing Cid")
}
if stringutils.IsBlank(cfg.Cid) {
return errors.Wrap(dferrors.ErrInvalidArgument, "Cid are all blanks")
}

// cmd specific validations
switch cmd {
case CmdStat:
return validateCacheStat(cfg)
case CmdImport:
return validateCacheImport(cfg)
case CmdExport:
return ValidateCacheExport(cfg)
case CmdDelete:
return ValidateCacheDelete(cfg)
default:
return errors.Wrapf(dferrors.ErrInvalidArgument, "unknown cache subcommand: %s", cmd)
}
}

func ConvertCacheStat(cfg *CacheOption, args []string) error {
return nil
}

func convertCacheImport(cfg *CacheOption, args []string) error {
var err error
if cfg.Path == "" && len(args) > 0 {
cfg.Path = args[0]
}
if cfg.Path == "" {
return errors.Wrap(dferrors.ErrInvalidArgument, "missing input file")
}

if cfg.Path, err = filepath.Abs(cfg.Path); err != nil {
return errors.Wrapf(err, "get absulate path for %s", cfg.Path)
}
return nil
}

func ConvertCacheExport(cfg *CacheOption, args []string) error {
var err error
if cfg.Output == "" && len(args) > 0 {
cfg.Output = args[0]
}
if cfg.Output == "" {
return errors.Wrap(dferrors.ErrInvalidArgument, "missing output file")
}

if cfg.Output, err = filepath.Abs(cfg.Output); err != nil {
return errors.Wrapf(err, "get absulate path for %s", cfg.Output)
}
return nil
}

func ConvertCacheDelete(cfg *CacheOption, args []string) error {
return nil
}

func (cfg *CacheOption) Convert(cmd string, args []string) error {
if cfg == nil {
return errors.Wrap(dferrors.ErrInvalidArgument, "runtime config")
}

switch cmd {
case CmdStat:
return ConvertCacheStat(cfg, args)
case CmdImport:
return convertCacheImport(cfg, args)
case CmdExport:
return ConvertCacheExport(cfg, args)
case CmdDelete:
return ConvertCacheDelete(cfg, args)
default:
return errors.Wrapf(dferrors.ErrInvalidArgument, "unknown cache subcommand: %s", cmd)
}
}

func (cfg *CacheOption) String() string {
js, _ := json.Marshal(cfg)
return string(js)
}

func (cfg *CacheOption) checkInput() error {
stat, err := os.Stat(cfg.Path)
if err != nil {
return errors.Wrapf(err, "stat input path %q", cfg.Path)
}
if stat.IsDir() {
return fmt.Errorf("path[%q] is directory but requires file path", cfg.Path)
}
if err := syscall.Access(cfg.Path, syscall.O_RDONLY); err != nil {
return errors.Wrapf(err, "access %q", cfg.Path)
}
return nil
}

func (cfg *CacheOption) checkOutput() error {
if cfg.Output == "" {
return errors.New("no output file path specified")
}

if !filepath.IsAbs(cfg.Output) {
absPath, err := filepath.Abs(cfg.Output)
if err != nil {
return fmt.Errorf("get absolute path[%s] error: %v", cfg.Output, err)
}
cfg.Output = absPath
}

outputDir, _ := path.Split(cfg.Output)
if err := MkdirAll(outputDir, 0777, basic.UserID, basic.UserGroup); err != nil {
return err
}

f, err := os.Stat(cfg.Output)
if err == nil && f.IsDir() {
return fmt.Errorf("path[%s] is directory but requires file path", cfg.Output)
}

// check permission
for dir := cfg.Output; !stringutils.IsBlank(dir); dir = filepath.Dir(dir) {
if err := syscall.Access(dir, syscall.O_RDWR); err == nil {
break
} else if os.IsPermission(err) || dir == "/" {
return fmt.Errorf("user[%s] path[%s] %v", basic.Username, cfg.Output, err)
}
}
return nil
}
8 changes: 8 additions & 0 deletions client/daemon/peer/peertask_dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ func (d *dummySchedulerClient) LeaveTask(ctx context.Context, target *scheduler.
return nil
}

func (d *dummySchedulerClient) StatTask(ctx context.Context, request *scheduler.StatTaskRequest, option ...grpc.CallOption) (*scheduler.Task, error) {
panic("should not call this function")
}

func (d *dummySchedulerClient) AnnounceTask(ctx context.Context, request *scheduler.AnnounceTaskRequest, option ...grpc.CallOption) error {
panic("should not call this function")
}

func (d *dummySchedulerClient) Close() error {
return nil
}
Expand Down
70 changes: 70 additions & 0 deletions client/daemon/peer/peertask_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"

"github.com/go-http-utils/headers"
"github.com/pkg/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -54,6 +55,14 @@ type TaskManager interface {

IsPeerTaskRunning(id string) bool

// Check if the given task exists in P2P network
StatTask(ctx context.Context, taskID string) (*scheduler.Task, error)

// AnnouncePeerTask announces peer task info to P2P network
AnnouncePeerTask(ctx context.Context, meta storage.PeerTaskMetadata, cid string, urlMeta *base.UrlMeta) error

GetPieceManager() PieceManager

// Stop stops the PeerTaskManager
Stop(ctx context.Context) error
}
Expand Down Expand Up @@ -352,3 +361,64 @@ func (ptm *peerTaskManager) IsPeerTaskRunning(taskID string) bool {
_, ok := ptm.runningPeerTasks.Load(taskID)
return ok
}

func (ptm *peerTaskManager) StatTask(ctx context.Context, taskID string) (*scheduler.Task, error) {
req := &scheduler.StatTaskRequest{
TaskId: taskID,
}

return ptm.schedulerClient.StatTask(ctx, req)
}

func (ptm *peerTaskManager) GetPieceManager() PieceManager {
return ptm.pieceManager
}

func (ptm *peerTaskManager) AnnouncePeerTask(ctx context.Context,
meta storage.PeerTaskMetadata, cid string, urlMeta *base.UrlMeta) error {
log := logger.With("function", "AnnouncePeerTask", "taskID", meta.TaskID, "peerID", meta.PeerID, "CID", cid)

// Check if the given task is completed in local storageManager
if ptm.storageManager.FindCompletedTask(meta.TaskID) == nil {
msg := fmt.Sprintf("task %s not found in local storage", meta.TaskID)
log.Errorf(msg)
return errors.New(msg)
}

// prepare AnnounceTaskRequest
totalPieces, err := ptm.storageManager.GetTotalPieces(ctx, &meta)
if err != nil {
msg := fmt.Sprintf("get total pieces failed: %s", err)
log.Error(msg)
return errors.New(msg)
}
pieceTaskRequest := &base.PieceTaskRequest{
TaskId: meta.TaskID,
DstPid: meta.PeerID,
StartNum: 0,
Limit: uint32(totalPieces),
}
piecePacket, err := ptm.storageManager.GetPieces(ctx, pieceTaskRequest)
if err != nil {
msg := fmt.Sprintf("get pieces info failed: %s", err)
log.Error(msg)
return errors.New(msg)
}
piecePacket.DstAddr = fmt.Sprintf("%s:%d", ptm.host.Ip, ptm.host.DownPort)
req := &scheduler.AnnounceTaskRequest{
TaskId: meta.TaskID,
Cid: cid,
UrlMeta: urlMeta,
PeerHost: ptm.host,
PiecePacket: piecePacket,
}

// Announce peer task to scheduler
err = ptm.schedulerClient.AnnounceTask(ctx, req)
if err != nil {
msg := fmt.Sprintf("announce peer task failed: %s", err)
log.Error(msg)
return errors.Wrapf(err, "failed to announce peer task %s", meta.TaskID)
}
return nil
}
Loading

0 comments on commit a666680

Please sign in to comment.