Skip to content

Commit

Permalink
added part of the system tasks logic
Browse files Browse the repository at this point in the history
  • Loading branch information
tikazyq committed Jul 30, 2020
1 parent ec81344 commit d525099
Show file tree
Hide file tree
Showing 13 changed files with 256 additions and 54 deletions.
1 change: 1 addition & 0 deletions backend/conf/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ server:
java: "N"
dotnet: "N"
php: "N"
scripts: "/app/backend/scripts"
spider:
path: "/app/spiders"
task:
Expand Down
5 changes: 5 additions & 0 deletions backend/constants/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,8 @@ const (
RunTypeRandom string = "random"
RunTypeSelectedNodes string = "selected-nodes"
)

const (
TaskTypeSpider string = "spider"
TaskTypeSystem string = "system"
)
5 changes: 5 additions & 0 deletions backend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ func main() {
authGroup.POST("/tasks-cancel", routes.CancelSelectedTask) // 批量取消任务
authGroup.POST("/tasks-restart", routes.RestartSelectedTask) // 批量重试任务
}
// 系统任务/脚本
{
authGroup.PUT("/system-tasks", routes.PutSystemTask) // 运行系统任务
authGroup.GET("/system-scripts", routes.GetSystemScripts) // 获取系统脚本列表
}
// 定时任务
{
authGroup.GET("/schedules", routes.GetScheduleList) // 定时任务列表
Expand Down
1 change: 1 addition & 0 deletions backend/model/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Task struct {
Pid int `json:"pid" bson:"pid"`
RunType string `json:"run_type" bson:"run_type"`
ScheduleId bson.ObjectId `json:"schedule_id" bson:"schedule_id"`
Type string `json:"type" bson:"type"`

// 前端数据
SpiderName string `json:"spider_name"`
Expand Down
3 changes: 3 additions & 0 deletions backend/routes/spider.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,7 @@ func RunSelectedSpider(c *gin.Context) {
UserId: services.GetCurrentUserId(c),
RunType: constants.RunTypeAllNodes,
ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull),
Type: constants.TaskTypeSpider,
}

id, err := services.AddTask(t)
Expand All @@ -830,6 +831,7 @@ func RunSelectedSpider(c *gin.Context) {
UserId: services.GetCurrentUserId(c),
RunType: constants.RunTypeRandom,
ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull),
Type: constants.TaskTypeSpider,
}
id, err := services.AddTask(t)
if err != nil {
Expand All @@ -847,6 +849,7 @@ func RunSelectedSpider(c *gin.Context) {
UserId: services.GetCurrentUserId(c),
RunType: constants.RunTypeSelectedNodes,
ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull),
Type: constants.TaskTypeSpider,
}

id, err := services.AddTask(t)
Expand Down
116 changes: 116 additions & 0 deletions backend/routes/system_tasks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package routes

import (
"crawlab/constants"
"crawlab/model"
"crawlab/services"
"crawlab/utils"
"fmt"
"github.com/gin-gonic/gin"
"github.com/globalsign/mgo/bson"
"net/http"
)

func GetSystemScripts(c *gin.Context) {
HandleSuccessData(c, utils.GetSystemScripts())
}

func PutSystemTask(c *gin.Context) {
type TaskRequestBody struct {
RunType string `json:"run_type"`
NodeIds []bson.ObjectId `json:"node_ids"`
Script string `json:"script"`
}

// 绑定数据
var reqBody TaskRequestBody
if err := c.ShouldBindJSON(&reqBody); err != nil {
HandleError(http.StatusBadRequest, c, err)
return
}

// 校验脚本参数不为空
if reqBody.Script == "" {
HandleErrorF(http.StatusBadRequest, c, "script cannot be empty")
return
}

// 校验脚本参数是否存在
var allScripts = utils.GetSystemScripts()
if !utils.StringArrayContains(allScripts, reqBody.Script) {
HandleErrorF(http.StatusBadRequest, c, "script does not exist")
return
}

// 获取执行命令
cmd := fmt.Sprintf("sh %s", utils.GetSystemScriptPath(reqBody.Script))

// 任务ID
var taskIds []string

if reqBody.RunType == constants.RunTypeAllNodes {
// 所有节点
nodes, err := model.GetNodeList(nil)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
for _, node := range nodes {
t := model.Task{
SpiderId: bson.ObjectIdHex(constants.ObjectIdNull),
NodeId: node.Id,
UserId: services.GetCurrentUserId(c),
RunType: constants.RunTypeAllNodes,
ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull),
Type: constants.TaskTypeSystem,
Cmd: cmd,
}
id, err := services.AddTask(t)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
taskIds = append(taskIds, id)
}
} else if reqBody.RunType == constants.RunTypeRandom {
// 随机
t := model.Task{
SpiderId: bson.ObjectIdHex(constants.ObjectIdNull),
UserId: services.GetCurrentUserId(c),
RunType: constants.RunTypeRandom,
ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull),
Type: constants.TaskTypeSystem,
Cmd: cmd,
}
id, err := services.AddTask(t)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
taskIds = append(taskIds, id)
} else if reqBody.RunType == constants.RunTypeSelectedNodes {
// 指定节点
for _, nodeId := range reqBody.NodeIds {
t := model.Task{
SpiderId: bson.ObjectIdHex(constants.ObjectIdNull),
NodeId: nodeId,
UserId: services.GetCurrentUserId(c),
RunType: constants.RunTypeSelectedNodes,
ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull),
Type: constants.TaskTypeSystem,
Cmd: cmd,
}
id, err := services.AddTask(t)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
taskIds = append(taskIds, id)
}
} else {
HandleErrorF(http.StatusInternalServerError, c, "invalid run_type")
return
}

HandleSuccessData(c, taskIds)
}
10 changes: 10 additions & 0 deletions backend/routes/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type TaskListRequestData struct {
SpiderId string `form:"spider_id"`
ScheduleId string `form:"schedule_id"`
Status string `form:"status"`
Type string `form:"type"`
}

type TaskResultsRequestData struct {
Expand Down Expand Up @@ -64,6 +65,9 @@ func GetTaskList(c *gin.Context) {
if data.ScheduleId != "" {
query["schedule_id"] = bson.ObjectIdHex(data.ScheduleId)
}
if data.Type != "" {
query["type"] = data.Type
}

// 获取校验
query = services.GetAuthQuery(query, c)
Expand Down Expand Up @@ -150,6 +154,7 @@ func PutTask(c *gin.Context) {
UserId: services.GetCurrentUserId(c),
RunType: constants.RunTypeAllNodes,
ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull),
Type: constants.TaskTypeSpider,
}

id, err := services.AddTask(t)
Expand All @@ -168,6 +173,7 @@ func PutTask(c *gin.Context) {
UserId: services.GetCurrentUserId(c),
RunType: constants.RunTypeRandom,
ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull),
Type: constants.TaskTypeSpider,
}
id, err := services.AddTask(t)
if err != nil {
Expand All @@ -185,6 +191,7 @@ func PutTask(c *gin.Context) {
UserId: services.GetCurrentUserId(c),
RunType: constants.RunTypeSelectedNodes,
ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull),
Type: constants.TaskTypeSpider,
}

id, err := services.AddTask(t)
Expand Down Expand Up @@ -225,6 +232,7 @@ func PutBatchTasks(c *gin.Context) {
UserId: services.GetCurrentUserId(c),
RunType: constants.RunTypeAllNodes,
ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull),
Type: constants.TaskTypeSpider,
}

id, err := services.AddTask(t)
Expand All @@ -242,6 +250,7 @@ func PutBatchTasks(c *gin.Context) {
UserId: services.GetCurrentUserId(c),
RunType: constants.RunTypeRandom,
ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull),
Type: constants.TaskTypeSpider,
}
id, err := services.AddTask(t)
if err != nil {
Expand All @@ -259,6 +268,7 @@ func PutBatchTasks(c *gin.Context) {
UserId: services.GetCurrentUserId(c),
RunType: constants.RunTypeSelectedNodes,
ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull),
Type: constants.TaskTypeSpider,
}

id, err := services.AddTask(t)
Expand Down
3 changes: 3 additions & 0 deletions backend/services/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func AddScheduleTask(s model.Schedule) func() {
UserId: s.UserId,
RunType: constants.RunTypeAllNodes,
ScheduleId: s.Id,
Type: constants.TaskTypeSpider,
}

if _, err := AddTask(t); err != nil {
Expand All @@ -73,6 +74,7 @@ func AddScheduleTask(s model.Schedule) func() {
UserId: s.UserId,
RunType: constants.RunTypeRandom,
ScheduleId: s.Id,
Type: constants.TaskTypeSpider,
}
if _, err := AddTask(t); err != nil {
log.Errorf(err.Error())
Expand All @@ -90,6 +92,7 @@ func AddScheduleTask(s model.Schedule) func() {
UserId: s.UserId,
RunType: constants.RunTypeSelectedNodes,
ScheduleId: s.Id,
Type: constants.TaskTypeSpider,
}

if _, err := AddTask(t); err != nil {
Expand Down
Loading

0 comments on commit d525099

Please sign in to comment.