Skip to content

Commit

Permalink
更新后台任务, 取消调度前台
Browse files Browse the repository at this point in the history
  • Loading branch information
iikira committed May 25, 2018
1 parent 07b940c commit 3ccc710
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 350 deletions.
113 changes: 58 additions & 55 deletions baidupcs/file_directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ import (
"unsafe"
)

// OrderBy 排序字段
type OrderBy string

// Order 升序降序
type Order string
type (
// OrderBy 排序字段
OrderBy string
// Order 升序降序
Order string
)

const (
// OrderByName 根据文件名排序
Expand All @@ -32,60 +33,62 @@ const (
OrderDesc Order = "desc"
)

// HandleFileDirectoryFunc 处理文件或目录的元信息
type HandleFileDirectoryFunc func(depth int, fd *FileDirectory)

// FileDirectory 文件或目录的元信息
type FileDirectory struct {
FsID int64 // fs_id
Path string // 路径
Filename string // 文件名 或 目录名
Ctime int64 // 创建日期
Mtime int64 // 修改日期
MD5 string // md5 值
Size int64 // 文件大小 (目录为0)
Isdir bool // 是否为目录
Ifhassubdir bool // 是否含有子目录 (只对目录有效)

Parent *FileDirectory // 父目录信息
Children FileDirectoryList // 子目录信息
}
type (
// HandleFileDirectoryFunc 处理文件或目录的元信息
HandleFileDirectoryFunc func(depth int, fd *FileDirectory)

// FileDirectory 文件或目录的元信息
FileDirectory struct {
FsID int64 // fs_id
Path string // 路径
Filename string // 文件名 或 目录名
Ctime int64 // 创建日期
Mtime int64 // 修改日期
MD5 string // md5 值
Size int64 // 文件大小 (目录为0)
Isdir bool // 是否为目录
Ifhassubdir bool // 是否含有子目录 (只对目录有效)

Parent *FileDirectory // 父目录信息
Children FileDirectoryList // 子目录信息
}

// FileDirectoryList FileDirectory 的 指针数组
type FileDirectoryList []*FileDirectory

// fdJSON 用于解析远程JSON数据
type fdJSON struct {
FsID int64 `json:"fs_id"` // fs_id
Path string `json:"path"` // 路径
Filename string `json:"server_filename"` // 文件名 或 目录名
Ctime int64 `json:"ctime"` // 创建日期
Mtime int64 `json:"mtime"` // 修改日期
MD5 string `json:"md5"` // md5 值
Size int64 `json:"size"` // 文件大小 (目录为0)
IsdirInt int8 `json:"isdir"`
IfhassubdirInt int8 `json:"ifhassubdir"`

// 对齐
_ *fdJSON
_ []*fdJSON
}
// FileDirectoryList FileDirectory 的 指针数组
FileDirectoryList []*FileDirectory

// fdJSON 用于解析远程JSON数据
fdJSON struct {
FsID int64 `json:"fs_id"` // fs_id
Path string `json:"path"` // 路径
Filename string `json:"server_filename"` // 文件名 或 目录名
Ctime int64 `json:"ctime"` // 创建日期
Mtime int64 `json:"mtime"` // 修改日期
MD5 string `json:"md5"` // md5 值
Size int64 `json:"size"` // 文件大小 (目录为0)
IsdirInt int8 `json:"isdir"`
IfhassubdirInt int8 `json:"ifhassubdir"`

// 对齐
_ *fdJSON
_ []*fdJSON
}

type fdData struct {
*ErrInfo
List []*FileDirectory
}
fdData struct {
*ErrInfo
List []*FileDirectory
}

type fdDataJSONExport struct {
*ErrInfo
List []*fdJSON `json:"list"`
}
fdDataJSONExport struct {
*ErrInfo
List []*fdJSON `json:"list"`
}

// OrderOptions 列文件/目录可选项
type OrderOptions struct {
By OrderBy
Order Order
}
// OrderOptions 列文件/目录可选项
OrderOptions struct {
By OrderBy
Order Order
}
)

// DefaultOrderOptions 默认的排序
var DefaultOrderOptions = &OrderOptions{
Expand Down
151 changes: 68 additions & 83 deletions internal/pcscommand/bg_fg.go
Original file line number Diff line number Diff line change
@@ -1,40 +1,46 @@
package pcscommand

import (
"sync"
"fmt"
"github.com/iikira/BaiduPCS-Go/pcstable"
"os"
"time"
"strconv"
"strings"
"crypto/md5"

"github.com/iikira/BaiduPCS-Go/requester/downloader"
"github.com/iikira/BaiduPCS-Go/pcstable"
"sync"
"sync/atomic"
)

var (
bgMap = bgTasks{
// BgMap 后台
BgMap = BgTasks{
tasks: sync.Map{},
ticker: time.NewTicker(1*time.Minute),
sig: make(chan struct{}),
}
)

func init() {
bgMap.cycleCheck()
}

type bgTasks struct {
tasks sync.Map
ticker *time.Ticker
checkStarted bool
}
type (
// BgTasks 后台任务
BgTasks struct {
lastID int64
tasks sync.Map
started bool
sig chan struct{}
}
// BgDTaskItem 后台任务详情
BgDTaskItem struct {
id int
downloadOptions *DownloadOptions
pcspaths []string
done <-chan struct{}
}
)

func (b *bgTasks)checkDoneTask() {
func (b *BgTasks) checkDoneTask() {
b.tasks.Range(func(id, v interface{}) bool {
task := v.(*bgDTaskItem)
task := v.(*BgDTaskItem)
select {
case <- task.Done:
fmt.Printf("任务:%v 已完成\n", id.(string))
case <-task.done:
fmt.Printf("任务:%d 已完成\n", id.(int64))
b.tasks.Delete(id)
return true
default:
Expand All @@ -43,79 +49,58 @@ func (b *bgTasks)checkDoneTask() {
})
}

// 周期性检查是否有后台任务完成
func (b *bgTasks)cycleCheck() {
if b.checkStarted {
return
}

b.checkStarted = true
go func() {
for {
select {
case <- b.ticker.C:
b.checkDoneTask()
}
}
}()
}

type bgDTaskItem struct {
paths []string
outputcontrol *downloader.OutputController
Done chan struct{}
// NewID 返回生成的 ID
func (b *BgTasks) NewID() int64 {
id := atomic.AddInt64(&b.lastID, 1)
return id
}

// TaskID 以paths生成md5,取前10位做taskID
func (t *bgDTaskItem)TaskID() string {
data := strings.Join(t.paths, "")
has := md5.Sum([]byte(data))
id := fmt.Sprintf("%x", has)
return string(id[:10])
// TaskID 返回后台任务 id
func (t *BgDTaskItem) TaskID() int {
return t.id
}

func PrintAllBgTask() {
// PrintAllBgTask 输出所有的后台任务
func (b *BgTasks) PrintAllBgTask() {
tb := pcstable.NewTable(os.Stdout)
tb.SetHeader([]string{"task_id", "downloading files"})
bgMap.tasks.Range(func(id, v interface{}) bool {
tb.Append([]string{id.(string), strings.Join(v.(*bgDTaskItem).paths, ",")})
tb.SetHeader([]string{"task_id", "files"})
b.tasks.Range(func(id, v interface{}) bool {
tb.Append([]string{strconv.FormatInt(id.(int64), 10), strings.Join(v.(*BgDTaskItem).pcspaths, ",")})
return true
})
tb.Render()
}

func RunBgDownload(paths []string, option DownloadOption) {
task := new(bgDTaskItem)
task.paths = make([]string, 0, len(paths))
task.paths = append(task.paths, paths...)
task.outputcontrol = downloader.NewOutputController()
task.Done = make(chan struct{})

taskID := task.TaskID()
_, exists := bgMap.tasks.Load(taskID)
if exists {
fmt.Printf("下载任务 ID:%v 已在后台下载任务中\n", taskID)
return
// RunBgDownload 执行后台下载
func RunBgDownload(paths []string, options *DownloadOptions) {
if !BgMap.started {
go func() {
for {
select {
case <-BgMap.sig:
BgMap.checkDoneTask()
}
}
}()
} else {
BgMap.started = true
}

bgMap.tasks.Store(taskID, task)

option.BanOutput = task.outputcontrol
go func(done chan struct{}) {
RunDownload(paths, option, taskID)
close(done)
}(task.Done)
}

func RunFgDownload(taskID string) {
t, ok := bgMap.tasks.Load(taskID)
if !ok {
fmt.Printf("任务:%v 不存在\n", taskID)
return
if options.Out == nil {
options.Out, _ = os.Open(os.DevNull)
}

task := t.(*bgDTaskItem)
bgMap.checkDoneTask()
task.outputcontrol.SetTrigger(false)
}

task := new(BgDTaskItem)
task.pcspaths = paths

dchan := make(chan struct{})
task.done = dchan

BgMap.tasks.Store(BgMap.NewID(), task)

go func(dchan chan struct{}) {
RunDownload(paths, options)
close(dchan)
BgMap.sig <- struct{}{}
}(dchan)
}
Loading

0 comments on commit 3ccc710

Please sign in to comment.