Skip to content

Commit

Permalink
Refactor: use universal FileHeader when handling file upload, remove …
Browse files Browse the repository at this point in the history
…usage of global ctx with FileHeader, SavePath, DisableOverwrite
  • Loading branch information
HFO4 committed Feb 27, 2022
1 parent 8a222e7 commit 868a88e
Show file tree
Hide file tree
Showing 39 changed files with 331 additions and 359 deletions.
23 changes: 22 additions & 1 deletion models/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package model

import (
"encoding/gob"
"encoding/json"
"path"
"time"

Expand All @@ -20,12 +21,15 @@ type File struct {
PicInfo string
FolderID uint `gorm:"index:folder_id;unique_index:idx_only_one"`
PolicyID uint
Hidden bool
Metadata string `gorm:"type:text"`

// 关联模型
Policy Policy `gorm:"PRELOAD:false,association_autoupdate:false"`

// 数据库忽略字段
Position string `gorm:"-"`
Position string `gorm:"-"`
MetadataSerialized map[string]string `gorm:"-"`
}

func init() {
Expand All @@ -42,6 +46,23 @@ func (file *File) Create() (uint, error) {
return file.ID, nil
}

// AfterFind 找到文件后的钩子
func (file *File) AfterFind() (err error) {
// 反序列化文件元数据
if file.Metadata != "" {
err = json.Unmarshal([]byte(file.Metadata), &file.MetadataSerialized)
}

return
}

// BeforeSave Save策略前的钩子
func (file *File) BeforeSave() (err error) {
metaValue, err := json.Marshal(&file.MetadataSerialized)
file.Metadata = string(metaValue)
return err
}

// GetChildFile 查找目录下名为name的子文件
func (folder *Folder) GetChildFile(name string) (*File, error) {
var file File
Expand Down
13 changes: 6 additions & 7 deletions pkg/aria2/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/local"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
"github.com/cloudreve/Cloudreve/v3/pkg/task"
Expand Down Expand Up @@ -191,12 +190,12 @@ func (monitor *Monitor) ValidateFile() error {
defer fs.Recycle()

// 创建上下文环境
ctx := context.WithValue(context.Background(), fsctx.FileHeaderCtx, local.FileStream{
file := &fsctx.FileStream{
Size: monitor.Task.TotalSize,
})
}

// 验证用户容量
if err := filesystem.HookValidateCapacityWithoutIncrease(ctx, fs); err != nil {
if err := filesystem.HookValidateCapacityWithoutIncrease(context.Background(), fs, file); err != nil {
return err
}

Expand All @@ -205,11 +204,11 @@ func (monitor *Monitor) ValidateFile() error {
if fileInfo.Selected == "true" {
// 创建上下文环境
fileSize, _ := strconv.ParseUint(fileInfo.Length, 10, 64)
ctx := context.WithValue(context.Background(), fsctx.FileHeaderCtx, local.FileStream{
file := &fsctx.FileStream{
Size: fileSize,
Name: filepath.Base(fileInfo.Path),
})
if err := filesystem.HookValidateFile(ctx, fs); err != nil {
}
if err := filesystem.HookValidateFile(context.Background(), fs, file); err != nil {
return err
}
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/filesystem/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,13 @@ func (fs *FileSystem) Decompress(ctx context.Context, src, dst string) error {
}
}()

err = fs.UploadFromStream(ctx, fileStream, savePath, uint64(size))
err = fs.UploadFromStream(ctx, &fsctx.FileStream{
File: fileStream,
Size: uint64(size),
Name: path.Base(dst),
VirtualPath: path.Dir(dst),
Mode: fsctx.Create,
})
fileStream.Close()
if err != nil {
util.Log().Debug("无法上传压缩包内的文件%s , %s , 跳过", rawPath, err)
Expand Down
16 changes: 5 additions & 11 deletions pkg/filesystem/driver/cos/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ func (handler Driver) Get(ctx context.Context, path string) (response.RSCloser,
}

// Put 将文件流保存到指定目录
func (handler Driver) Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error {
func (handler Driver) Put(ctx context.Context, file fsctx.FileHeader) error {
opt := &cossdk.ObjectPutOptions{}
_, err := handler.Client.Object.Put(ctx, dst, file, opt)
_, err := handler.Client.Object.Put(ctx, file.GetSavePath(), file, opt)
return err
}

Expand Down Expand Up @@ -324,27 +324,21 @@ func (handler Driver) signSourceURL(ctx context.Context, path string, ttl int64,
}

// Token 获取上传策略和认证Token
func (handler Driver) Token(ctx context.Context, TTL int64, uploadSession *serializer.UploadSession) (serializer.UploadCredential, error) {
// 读取上下文中生成的存储路径
savePath, ok := ctx.Value(fsctx.SavePathCtx).(string)
if !ok {
return serializer.UploadCredential{}, errors.New("无法获取存储路径")
}

func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (serializer.UploadCredential, error) {
// 生成回调地址
siteURL := model.GetSiteURL()
apiBaseURI, _ := url.Parse("/api/v3/callback/cos/" + uploadSession.Key)
apiURL := siteURL.ResolveReference(apiBaseURI).String()

// 上传策略
startTime := time.Now()
endTime := startTime.Add(time.Duration(TTL) * time.Second)
endTime := startTime.Add(time.Duration(ttl) * time.Second)
keyTime := fmt.Sprintf("%d;%d", startTime.Unix(), endTime.Unix())
postPolicy := UploadPolicy{
Expiration: endTime.UTC().Format(time.RFC3339),
Conditions: []interface{}{
map[string]string{"bucket": handler.Policy.BucketName},
map[string]string{"$key": savePath},
map[string]string{"$key": file.GetSavePath()},
map[string]string{"x-cos-meta-callback": apiURL},
map[string]string{"x-cos-meta-key": uploadSession.Key},
map[string]string{"q-sign-algorithm": "sha1"},
Expand Down
6 changes: 3 additions & 3 deletions pkg/filesystem/driver/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ package driver

import (
"context"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/response"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"io"
"net/url"
)

// Handler 存储策略适配器
type Handler interface {
// 上传文件, dst为文件存储路径,size 为文件大小。上下文关闭
// 时,应取消上传并清理临时文件
Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error
Put(ctx context.Context, file fsctx.FileHeader) error

// 删除一个或多个给定路径的文件,返回删除失败的文件路径列表及错误
Delete(ctx context.Context, files []string) ([]string, error)
Expand All @@ -30,7 +30,7 @@ type Handler interface {
Source(ctx context.Context, path string, url url.URL, ttl int64, isDownload bool, speed int) (string, error)

// Token 获取有效期为ttl的上传凭证和签名
Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession) (serializer.UploadCredential, error)
Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (serializer.UploadCredential, error)

// List 递归列取远程端path路径下文件、目录,不包含path本身,
// 返回的对象路径以path作为起始根目录.
Expand Down
38 changes: 0 additions & 38 deletions pkg/filesystem/driver/local/file.go

This file was deleted.

10 changes: 5 additions & 5 deletions pkg/filesystem/driver/local/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ func (handler Driver) Get(ctx context.Context, path string) (response.RSCloser,
}

// Put 将文件流保存到指定目录
func (handler Driver) Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error {
func (handler Driver) Put(ctx context.Context, file fsctx.FileHeader) error {
defer file.Close()
dst = util.RelativePath(filepath.FromSlash(dst))
dst := util.RelativePath(filepath.FromSlash(file.GetSavePath()))

// 如果禁止了 Overwrite,则检查是否有重名冲突
if ctx.Value(fsctx.DisableOverwrite) != nil {
// 如果非 Overwrite,则检查是否有重名冲突
if file.GetMode() != fsctx.Overwrite {
if util.Exists(dst) {
util.Log().Warning("物理同名文件已存在或不可用: %s", dst)
return errors.New("物理同名文件已存在或不可用")
Expand Down Expand Up @@ -214,7 +214,7 @@ func (handler Driver) Source(
}

// Token 获取上传策略和认证Token,本地策略直接返回空值
func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession) (serializer.UploadCredential, error) {
func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (serializer.UploadCredential, error) {
return serializer.UploadCredential{
SessionID: uploadSession.Key,
}, nil
Expand Down
7 changes: 5 additions & 2 deletions pkg/filesystem/driver/onedrive/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,16 @@ func (client *Client) UploadChunk(ctx context.Context, uploadURL string, chunk *
}

// Upload 上传文件
func (client *Client) Upload(ctx context.Context, dst string, size int, file io.Reader) error {
func (client *Client) Upload(ctx context.Context, file fsctx.FileHeader) error {
// 决定是否覆盖文件
overwrite := "replace"
if ctx.Value(fsctx.DisableOverwrite) != nil {
if file.GetMode() != fsctx.Overwrite {
overwrite = "fail"
}

size := int(file.GetSize())
dst := file.GetSavePath()

// 小文件,使用简单上传接口上传
if size <= int(SmallFileSize) {
_, err := client.SimpleUpload(ctx, dst, file, int64(size), WithConflictBehavior(overwrite))
Expand Down
23 changes: 6 additions & 17 deletions pkg/filesystem/driver/onedrive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"io"
"net/url"
"path"
"path/filepath"
Expand Down Expand Up @@ -121,9 +120,9 @@ func (handler Driver) Get(ctx context.Context, path string) (response.RSCloser,
}

// Put 将文件流保存到指定目录
func (handler Driver) Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error {
func (handler Driver) Put(ctx context.Context, file fsctx.FileHeader) error {
defer file.Close()
return handler.Client.Upload(ctx, dst, int(size), file)
return handler.Client.Upload(ctx, file)
}

// Delete 删除一个或多个文件,
Expand Down Expand Up @@ -223,20 +222,10 @@ func (handler Driver) replaceSourceHost(origin string) (string, error) {
}

// Token 获取上传会话URL
func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession) (serializer.UploadCredential, error) {

// 读取上下文中生成的存储路径和文件大小
savePath, ok := ctx.Value(fsctx.SavePathCtx).(string)
if !ok {
return serializer.UploadCredential{}, errors.New("无法获取存储路径")
}
fileSize, ok := ctx.Value(fsctx.FileSizeCtx).(uint64)
if !ok {
return serializer.UploadCredential{}, errors.New("无法获取文件大小")
}
func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (serializer.UploadCredential, error) {

// 如果小于4MB,则由服务端中转
if fileSize <= SmallFileSize {
if file.GetSize() <= SmallFileSize {
return serializer.UploadCredential{}, nil
}

Expand All @@ -245,13 +234,13 @@ func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *seria
apiBaseURI, _ := url.Parse("/api/v3/callback/onedrive/finish/" + uploadSession.Key)
apiURL := siteURL.ResolveReference(apiBaseURI)

uploadURL, err := handler.Client.CreateUploadSession(ctx, savePath, WithConflictBehavior("fail"))
uploadURL, err := handler.Client.CreateUploadSession(ctx, file.GetSavePath(), WithConflictBehavior("fail"))
if err != nil {
return serializer.UploadCredential{}, err
}

// 监控回调及上传
go handler.Client.MonitorUpload(uploadURL, uploadSession.Key, savePath, fileSize, ttl)
go handler.Client.MonitorUpload(uploadURL, uploadSession.Key, file.GetSavePath(), file.GetSize(), ttl)

return serializer.UploadCredential{
Policy: uploadURL,
Expand Down
10 changes: 5 additions & 5 deletions pkg/filesystem/driver/onedrive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ func TestDriver_Token(t *testing.T) {
// 无法获取文件路径
{
ctx := context.WithValue(context.Background(), fsctx.FileSizeCtx, uint64(10))
res, err := handler.Token(ctx, 10, "key")
res, err := handler.Token(ctx, 10, "key", nil)
asserts.Error(err)
asserts.Equal(serializer.UploadCredential{}, res)
}

// 无法获取文件大小
{
ctx := context.WithValue(context.Background(), fsctx.SavePathCtx, "/123")
res, err := handler.Token(ctx, 10, "key")
res, err := handler.Token(ctx, 10, "key", nil)
asserts.Error(err)
asserts.Equal(serializer.UploadCredential{}, res)
}
Expand All @@ -53,7 +53,7 @@ func TestDriver_Token(t *testing.T) {
{
ctx := context.WithValue(context.Background(), fsctx.SavePathCtx, "/123")
ctx = context.WithValue(ctx, fsctx.FileSizeCtx, uint64(10))
res, err := handler.Token(ctx, 10, "key")
res, err := handler.Token(ctx, 10, "key", nil)
asserts.NoError(err)
asserts.Equal(serializer.UploadCredential{}, res)
}
Expand All @@ -80,7 +80,7 @@ func TestDriver_Token(t *testing.T) {
handler.Client.Request = clientMock
ctx := context.WithValue(context.Background(), fsctx.SavePathCtx, "/123")
ctx = context.WithValue(ctx, fsctx.FileSizeCtx, uint64(20*1024*1024))
res, err := handler.Token(ctx, 10, "key")
res, err := handler.Token(ctx, 10, "key", nil)
asserts.Error(err)
asserts.Equal(serializer.UploadCredential{}, res)
}
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestDriver_Token(t *testing.T) {
time.Sleep(time.Duration(1) * time.Second)
FinishCallback("key")
}()
res, err := handler.Token(ctx, 10, "key")
res, err := handler.Token(ctx, 10, "key", nil)
asserts.NoError(err)
asserts.Equal("123321", res.Policy)
}
Expand Down
Loading

0 comments on commit 868a88e

Please sign in to comment.