Skip to content

Commit

Permalink
feat: replace gin-gonic/gin with gorilla/mux (dragonflyoss#1389)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Jun 15, 2022
1 parent eb0ed09 commit b42e2b2
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 92 deletions.
3 changes: 1 addition & 2 deletions cdn/cdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"d7y.io/dragonfly/v2/cdn/supervisor/cdn/storage"
"d7y.io/dragonfly/v2/cdn/supervisor/progress"
"d7y.io/dragonfly/v2/cdn/supervisor/task"
"d7y.io/dragonfly/v2/client/daemon/upload"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/manager/model"
"d7y.io/dragonfly/v2/pkg/rpc/manager"
Expand Down Expand Up @@ -104,7 +103,7 @@ func New(config *config.Config) (*Server, error) {
return nil, errors.Wrap(err, "create rpcServer")
}

fileServer := fileserver.New(config.RPCServer.DownloadPort, upload.PeerDownloadHTTPPathPrefix, storageManager.GetUploadPath())
fileServer := fileserver.New(config.RPCServer.DownloadPort, "/download/", storageManager.GetUploadPath())

// Initialize gc server
gcServer, err := gc.New()
Expand Down
14 changes: 8 additions & 6 deletions client/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"sync"
"time"

"github.com/gorilla/mux"
"github.com/gin-gonic/gin"
"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -549,17 +549,19 @@ func (cd *clientDaemon) Serve() error {
if cd.Option.Health.ListenOption.TCPListen == nil {
logger.Fatalf("health listen not found")
}
logger.Infof("serve http health at %#v", cd.Option.Health.ListenOption.TCPListen)
r := mux.NewRouter()
r.Path(cd.Option.Health.Path).HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
writer.WriteHeader(http.StatusOK)
_, _ = writer.Write([]byte("success"))

r := gin.Default()
r.GET(cd.Option.Health.Path, func(c *gin.Context) {
c.JSON(http.StatusOK, http.StatusText(http.StatusOK))
})

listener, _, err := cd.prepareTCPListener(cd.Option.Health.ListenOption, false)
if err != nil {
logger.Fatalf("init health http server error: %v", err)
}

go func() {
logger.Infof("serve http health at %#v", cd.Option.Health.ListenOption.TCPListen)
if err = http.Serve(listener, r); err != nil {
if err == http.ErrServerClosed {
return
Expand Down
25 changes: 10 additions & 15 deletions client/daemon/peer/piece_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ import (
"io"
"net"
"net/http"
"strings"
"net/url"
"time"

"d7y.io/dragonfly/v2/client/daemon/storage"
"d7y.io/dragonfly/v2/client/daemon/upload"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/source"
Expand Down Expand Up @@ -190,20 +189,16 @@ func (p *pieceDownloader) DownloadPiece(ctx context.Context, req *DownloadPieceR
}

func buildDownloadPieceHTTPRequest(ctx context.Context, d *DownloadPieceRequest) *http.Request {
b := strings.Builder{}
// FIXME switch to https when tls enabled
b.WriteString("http://")
b.WriteString(d.DstAddr)
b.WriteString(upload.PeerDownloadHTTPPathPrefix)
b.Write([]byte(d.TaskID)[:3])
b.Write([]byte("/"))
b.WriteString(d.TaskID)
b.Write([]byte("?peerId="))
b.WriteString(d.DstPid)

u := b.String()
logger.Debugf("built request url: %s", u)
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
targetURL := url.URL{
Scheme: "http",
Host: d.DstAddr,
Path: fmt.Sprintf("download/%s/%s", d.TaskID[:3], d.TaskID),
RawQuery: fmt.Sprintf("peerId=%s", d.DstPid),
}

logger.Debugf("built request url: %s", targetURL.String())
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, targetURL.String(), nil)

// TODO use string.Builder
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d",
Expand Down
9 changes: 4 additions & 5 deletions client/daemon/peer/piece_downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (

"d7y.io/dragonfly/v2/client/clientutil"
"d7y.io/dragonfly/v2/client/daemon/test"
"d7y.io/dragonfly/v2/client/daemon/upload"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/source"
Expand All @@ -62,7 +61,7 @@ func TestPieceDownloader_DownloadPiece(t *testing.T) {
}{
{
handleFunc: func(w http.ResponseWriter, r *http.Request) {
assert.Equal(upload.PeerDownloadHTTPPathPrefix+"tas/"+"task-0", r.URL.Path)
assert.Equal("/download/tas/task-0", r.URL.Path)
data := []byte("test test ")
w.Header().Set(headers.ContentLength, fmt.Sprintf("%d", len(data)))
if _, err := w.Write(data); err != nil {
Expand All @@ -77,7 +76,7 @@ func TestPieceDownloader_DownloadPiece(t *testing.T) {
},
{
handleFunc: func(w http.ResponseWriter, r *http.Request) {
assert.Equal(upload.PeerDownloadHTTPPathPrefix+"tas/"+"task-1", r.URL.Path)
assert.Equal("/download/tas/task-1", r.URL.Path)
rg := clientutil.MustParseRange(r.Header.Get("Range"), math.MaxInt64)
w.Header().Set(headers.ContentLength, fmt.Sprintf("%d", rg.Length))
if _, err := w.Write(testData[rg.Start : rg.Start+rg.Length]); err != nil {
Expand All @@ -92,7 +91,7 @@ func TestPieceDownloader_DownloadPiece(t *testing.T) {
},
{
handleFunc: func(w http.ResponseWriter, r *http.Request) {
assert.Equal(upload.PeerDownloadHTTPPathPrefix+"tas/"+"task-2", r.URL.Path)
assert.Equal("/download/tas/task-2", r.URL.Path)
rg := clientutil.MustParseRange(r.Header.Get("Range"), math.MaxInt64)
w.Header().Set(headers.ContentLength, fmt.Sprintf("%d", rg.Length))
if _, err := w.Write(testData[rg.Start : rg.Start+rg.Length]); err != nil {
Expand All @@ -107,7 +106,7 @@ func TestPieceDownloader_DownloadPiece(t *testing.T) {
},
{
handleFunc: func(w http.ResponseWriter, r *http.Request) {
assert.Equal(upload.PeerDownloadHTTPPathPrefix+"tas/"+"task-3", r.URL.Path)
assert.Equal("/download/tas/task-3", r.URL.Path)
rg := clientutil.MustParseRange(r.Header.Get("Range"), math.MaxInt64)
w.Header().Set(headers.ContentLength, fmt.Sprintf("%d", rg.Length))
if _, err := w.Write(testData[rg.Start : rg.Start+rg.Length]); err != nil {
Expand Down
26 changes: 26 additions & 0 deletions client/daemon/upload/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2022 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package upload

type DownloadParams struct {
TaskPrefix string `uri:"task_prefix" binding:"required"`
TaskID string `uri:"task_id" binding:"required"`
}

type DownalodQuery struct {
PeerID string `form:"peerId" binding:"required"`
}
146 changes: 84 additions & 62 deletions client/daemon/upload/upload_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,132 +24,154 @@ import (
"net"
"net/http"

"github.com/gin-gonic/gin"
"github.com/go-http-utils/headers"
"github.com/gorilla/mux"
"golang.org/x/time/rate"

"d7y.io/dragonfly/v2/client/clientutil"
"d7y.io/dragonfly/v2/client/daemon/storage"
logger "d7y.io/dragonfly/v2/internal/dflog"
)

var _ *logger.SugaredLoggerOnWith // pin this package for no log code generation

// Manager is the interface used for upload task.
type Manager interface {
// Started upload manager server.
Serve(lis net.Listener) error

// Stop upload manager server.
Stop() error
}

// uploadManager provides upload manager function.
type uploadManager struct {
*http.Server
*rate.Limiter
StorageManager storage.Manager
storageManager storage.Manager
}

var _ Manager = (*uploadManager)(nil)

const (
PeerDownloadHTTPPathPrefix = "/download/"
)
// Option is a functional option for configuring the upload manager.
type Option func(um *uploadManager)

func NewUploadManager(s storage.Manager, opts ...func(*uploadManager)) (Manager, error) {
u := &uploadManager{
Server: &http.Server{},
StorageManager: s,
}
u.initRouter()
for _, opt := range opts {
opt(u)
}
return u, nil
}

// WithLimiter sets upload rate limiter, the burst size must be bigger than piece size
// WithLimiter sets upload rate limiter, the burst size must be bigger than piece size.
func WithLimiter(limiter *rate.Limiter) func(*uploadManager) {
return func(manager *uploadManager) {
manager.Limiter = limiter
}
}

func (um *uploadManager) initRouter() {
r := mux.NewRouter()
// Health Check
r.HandleFunc("/healthy", um.handleHealth).Methods("GET")
// New returns a new Manager instence.
func NewUploadManager(storageManager storage.Manager, opts ...Option) (Manager, error) {
um := &uploadManager{
storageManager: storageManager,
}

router := um.initRouter()
um.Server = &http.Server{
Handler: router,
}

for _, opt := range opts {
opt(um)
}

// Peer download task
r.HandleFunc(PeerDownloadHTTPPathPrefix+"{taskPrefix:.*}/"+"{task:.*}", um.handleUpload).Queries("peerId", "{.*}").Methods("GET")
um.Server.Handler = r
return um, nil
}

// Started upload manager server.
func (um *uploadManager) Serve(lis net.Listener) error {
return um.Server.Serve(lis)
}

// Stop upload manager server.
func (um *uploadManager) Stop() error {
return um.Server.Shutdown(context.Background())
}

// handleHealth uses to check server health.
func (um *uploadManager) handleHealth(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
// Initialize router of gin.
func (um *uploadManager) initRouter() *gin.Engine {
r := gin.Default()

// Health Check.
r.GET("/healthy", um.getHealth)

// Peer download task.
r.GET("/download/:task_prefix/:task_id", um.getDownload)

return r
}

// handleUpload uses to upload a task file when other peers download from it.
func (um *uploadManager) handleUpload(w http.ResponseWriter, r *http.Request) {
var (
task = mux.Vars(r)["task"]
peer = r.FormValue("peerId")
//cdnSource = r.Header.Get("X-Dragonfly-CDN-Source")
)

sLogger := logger.With("peer", peer, "task", task, "component", "uploadManager")
sLogger.Debugf("upload piece for task %s/%s to %s, request header: %#v", task, peer, r.RemoteAddr, r.Header)
rg, err := clientutil.ParseRange(r.Header.Get(headers.Range), math.MaxInt64)
// getHealth uses to check server health.
func (um *uploadManager) getHealth(ctx *gin.Context) {
ctx.JSON(http.StatusOK, http.StatusText(http.StatusOK))
}

// getDownload uses to upload a task file when other peers download from it.
func (um *uploadManager) getDownload(ctx *gin.Context) {
var params DownloadParams
if err := ctx.ShouldBindUri(&params); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}

var query DownalodQuery
if err := ctx.ShouldBindQuery(&query); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}

taskID := params.TaskID
peerID := query.PeerID

log := logger.WithTaskAndPeerID(taskID, peerID).With("component", "uploadManager")
log.Debugf("upload piece for task %s/%s to %s, request header: %#v", taskID, peerID, ctx.Request.RemoteAddr, ctx.Request.Header)
rg, err := clientutil.ParseRange(ctx.GetHeader(headers.Range), math.MaxInt64)
if err != nil {
sLogger.Errorf("parse range with error: %s", err)
http.Error(w, err.Error(), http.StatusBadRequest)
log.Errorf("parse range with error: %s", err)
ctx.JSON(http.StatusBadRequest, gin.H{"errors": err.Error()})
return
}

if len(rg) != 1 {
sLogger.Error("multi range parsed, not support")
http.Error(w, "invalid range", http.StatusBadRequest)
log.Error("multi range parsed, not support")
ctx.JSON(http.StatusBadRequest, gin.H{"errors": "invalid range"})
return
}

// add header "Content-Length" to avoid chunked body in http client
w.Header().Add(headers.ContentLength, fmt.Sprintf("%d", rg[0].Length))
reader, closer, err := um.StorageManager.ReadPiece(r.Context(),
// Add header "Content-Length" to avoid chunked body in http client.
ctx.Header(headers.ContentLength, fmt.Sprintf("%d", rg[0].Length))
reader, closer, err := um.storageManager.ReadPiece(ctx,
&storage.ReadPieceRequest{
PeerTaskMetadata: storage.PeerTaskMetadata{
TaskID: task,
PeerID: peer,
TaskID: taskID,
PeerID: peerID,
},
PieceMetadata: storage.PieceMetadata{
Num: -1,
Range: rg[0],
},
})
if err != nil {
sLogger.Errorf("get task data failed: %s", err)
http.Error(w, fmt.Sprintf("get piece data error: %s", err), http.StatusInternalServerError)
log.Errorf("get task data failed: %s", err)
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}
defer closer.Close()

if um.Limiter != nil {
if err = um.Limiter.WaitN(r.Context(), int(rg[0].Length)); err != nil {
sLogger.Errorf("get limit failed: %s", err)
http.Error(w, fmt.Sprintf("get limit error: %s", err), http.StatusInternalServerError)
if err = um.Limiter.WaitN(ctx, int(rg[0].Length)); err != nil {
log.Errorf("get limit failed: %s", err)
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}
}

// if w is a socket, golang will use sendfile or splice syscall for zero copy feature
// when start to transfer data, we could not call http.Error with header
if n, err := io.Copy(w, reader); err != nil {
sLogger.Errorf("transfer data failed: %s", err)
// If w is a socket, golang will use sendfile or splice syscall for zero copy feature
// when start to transfer data, we could not call http.Error with header.
if n, err := io.Copy(ctx.Writer, reader); err != nil {
log.Errorf("transfer data failed: %s", err)
return
} else if n != rg[0].Length {
sLogger.Errorf("transferred data length not match request, request: %d, transferred: %d",
log.Errorf("transferred data length not match request, request: %d, transferred: %d",
rg[0].Length, n)
return
}
Expand Down
2 changes: 1 addition & 1 deletion client/daemon/upload/upload_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestUploadManager_Serve(t *testing.T) {

for _, tt := range tests {
req, _ := http.NewRequest(http.MethodGet,
fmt.Sprintf("http://%s%s%s/%s?peerId=%s", addr, PeerDownloadHTTPPathPrefix, "666", tt.taskID, tt.peerID), nil)
fmt.Sprintf("http://%s/%s/%s/%s?peerId=%s", addr, "download", "666", tt.taskID, tt.peerID), nil)
req.Header.Add("Range", tt.pieceRange)

resp, err := http.DefaultClient.Do(req)
Expand Down
2 changes: 1 addition & 1 deletion manager/handlers/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ import (
// @Failure 500
// @Router /healthy [get]
func (h *Handlers) GetHealth(ctx *gin.Context) {
ctx.JSON(http.StatusOK, "OK")
ctx.JSON(http.StatusOK, http.StatusText(http.StatusOK))
}

0 comments on commit b42e2b2

Please sign in to comment.