Skip to content

Commit

Permalink
Feature: Job logs (#354)
Browse files Browse the repository at this point in the history
* Feature: Job logs

* Increase max log page size to 100
  • Loading branch information
runabol authored Mar 25, 2024
1 parent e706b2a commit a3fd37d
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 8 deletions.
1 change: 1 addition & 0 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Datastore interface {
CreateJob(ctx context.Context, j *tork.Job) error
UpdateJob(ctx context.Context, id string, modify func(u *tork.Job) error) error
GetJobByID(ctx context.Context, id string) (*tork.Job, error)
GetJobLogParts(ctx context.Context, jobID string, page, size int) (*Page[*tork.TaskLogPart], error)
GetJobs(ctx context.Context, q string, page, size int) (*Page[*tork.JobSummary], error)

GetMetrics(ctx context.Context) (*tork.Metrics, error)
Expand Down
47 changes: 47 additions & 0 deletions datastore/inmemory/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,53 @@ func (ds *InMemoryDatastore) GetTaskLogParts(ctx context.Context, taskID string,
}, nil
}

func (ds *InMemoryDatastore) GetJobLogParts(ctx context.Context, jobID string, page, size int) (*datastore.Page[*tork.TaskLogPart], error) {
execution := ds.getExecution(jobID)
allParts := make([]*tork.TaskLogPart, 0)
for _, task := range execution {
parts, ok := ds.logs.Get(task.ID)
if ok {
allParts = append(allParts, parts...)
}
}
if (len(allParts)) == 0 {
return &datastore.Page[*tork.TaskLogPart]{
Items: make([]*tork.TaskLogPart, 0),
Number: 1,
Size: 0,
TotalPages: 0,
TotalItems: 0,
}, nil
}
sort.Slice(allParts, func(i, j int) bool {
ti, _ := ds.tasks.Get(allParts[i].TaskID)
tj, _ := ds.tasks.Get(allParts[j].TaskID)
if ti.Position > tj.Position {
return true
} else if ti.Position < tj.Position {
return false
}
return allParts[i].Number > allParts[j].Number
})
offset := (page - 1) * size
result := make([]*tork.TaskLogPart, 0)
for i := offset; i < (offset+size) && i < len(allParts); i++ {
p := allParts[i]
result = append(result, p)
}
totalPages := len(allParts) / size
if len(allParts)%size != 0 {
totalPages = totalPages + 1
}
return &datastore.Page[*tork.TaskLogPart]{
Items: result,
Number: page,
Size: len(result),
TotalPages: totalPages,
TotalItems: len(allParts),
}, nil
}

func (ds *InMemoryDatastore) GetMetrics(ctx context.Context) (*tork.Metrics, error) {
s := &tork.Metrics{}

Expand Down
42 changes: 42 additions & 0 deletions datastore/inmemory/inmemory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,3 +441,45 @@ func TestInMemoryCreateAndGetTaskLogsLarge(t *testing.T) {
assert.Equal(t, 10, logs.Size)
assert.Equal(t, 10, logs.TotalPages)
}

func TestInMemoryGetJobLogParts(t *testing.T) {
ctx := context.Background()
ds := inmemory.NewInMemoryDatastore()
jid := uuid.NewUUID()
t1 := tork.Task{
ID: uuid.NewUUID(),
JobID: jid,
}
err := ds.CreateTask(ctx, &t1)
assert.NoError(t, err)

logs, err := ds.GetJobLogParts(ctx, jid, 1, 10)
assert.NoError(t, err)
assert.Len(t, logs.Items, 0)

err = ds.CreateTaskLogPart(ctx, &tork.TaskLogPart{
Number: 1,
TaskID: t1.ID,
Contents: "line 1",
})
assert.NoError(t, err)

logs, err = ds.GetJobLogParts(ctx, jid, 1, 10)
assert.NoError(t, err)
assert.Len(t, logs.Items, 1)
assert.Equal(t, "line 1", logs.Items[0].Contents)
assert.Equal(t, 1, logs.TotalPages)

err = ds.CreateTaskLogPart(ctx, &tork.TaskLogPart{
Number: 2,
TaskID: t1.ID,
Contents: "line 2",
})
assert.NoError(t, err)

logs, err = ds.GetJobLogParts(ctx, jid, 1, 10)
assert.NoError(t, err)
assert.Len(t, logs.Items, 2)
assert.Equal(t, "line 2", logs.Items[0].Contents)
assert.Equal(t, 1, logs.TotalPages)
}
47 changes: 43 additions & 4 deletions datastore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,11 +654,11 @@ func (ds *PostgresDatastore) expungeExpiredTaskLogPart() (int, error) {
func (ds *PostgresDatastore) GetTaskLogParts(ctx context.Context, taskID string, page, size int) (*datastore.Page[*tork.TaskLogPart], error) {
offset := (page - 1) * size
rs := []taskLogPartRecord{}
q := fmt.Sprintf(`SELECT *
FROM tasks_log_parts
q := fmt.Sprintf(`select *
from tasks_log_parts
where task_id = $1
ORDER BY number_ DESC
OFFSET %d LIMIT %d`, offset, size)
order by number_ DESC
offset %d limit %d`, offset, size)

if err := ds.select_(&rs, q, taskID); err != nil {
return nil, errors.Wrapf(err, "error task log parts from db")
Expand All @@ -684,6 +684,45 @@ func (ds *PostgresDatastore) GetTaskLogParts(ctx context.Context, taskID string,
}, nil
}

func (ds *PostgresDatastore) GetJobLogParts(ctx context.Context, jobID string, page, size int) (*datastore.Page[*tork.TaskLogPart], error) {
offset := (page - 1) * size
rs := []taskLogPartRecord{}
q := fmt.Sprintf(`select tlp.*
from tasks_log_parts tlp
join tasks t
on t.id = tlp.task_id
where t.job_id = $1
order by t.position desc, t.created_at desc, tlp.number_ desc, tlp.created_at DESC
offset %d limit %d`, offset, size)

if err := ds.select_(&rs, q, jobID); err != nil {
return nil, errors.Wrapf(err, "error task log parts from db")
}
items := make([]*tork.TaskLogPart, len(rs))
for i, r := range rs {
items[i] = r.toTaskLogPart()
}
var count *int
if err := ds.get(&count, `select count(*)
from tasks_log_parts tlp
join tasks t
on t.id = tlp.task_id
where t.job_id = $1`, jobID); err != nil {
return nil, errors.Wrapf(err, "error getting the task log parts count")
}
totalPages := *count / size
if *count%size != 0 {
totalPages = totalPages + 1
}
return &datastore.Page[*tork.TaskLogPart]{
Items: items,
Number: page,
Size: len(items),
TotalPages: totalPages,
TotalItems: *count,
}, nil
}

func (ds *PostgresDatastore) GetJobs(ctx context.Context, q string, page, size int) (*datastore.Page[*tork.JobSummary], error) {
offset := (page - 1) * size
rs := make([]jobRecord, 0)
Expand Down
32 changes: 32 additions & 0 deletions datastore/postgres/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1008,3 +1008,35 @@ func Test_cleanup(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 0, logs.TotalItems)
}

func TestPostgresGetJobLogtParts(t *testing.T) {
ctx := context.Background()
dsn := "host=localhost user=tork password=tork dbname=tork port=5432 sslmode=disable"
ds, err := NewPostgresDataStore(dsn)
assert.NoError(t, err)
now := time.Now().UTC()
j1 := tork.Job{
ID: uuid.NewUUID(),
}
err = ds.CreateJob(ctx, &j1)
assert.NoError(t, err)
t1 := tork.Task{
ID: uuid.NewUUID(),
CreatedAt: &now,
JobID: j1.ID,
}
err = ds.CreateTask(ctx, &t1)
assert.NoError(t, err)

err = ds.CreateTaskLogPart(ctx, &tork.TaskLogPart{
Number: 1,
TaskID: t1.ID,
Contents: "line 1",
})
assert.NoError(t, err)

logs, err := ds.GetJobLogParts(ctx, j1.ID, 1, 10)
assert.NoError(t, err)
assert.Len(t, logs.Items, 1)
assert.Equal(t, "line 1", logs.Items[0].Contents)
}
56 changes: 52 additions & 4 deletions internal/coordinator/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ import (
)

const (
MIN_PORT = 8000
MAX_PORT = 8100
MIN_PORT = 8000
MAX_PORT = 8100
MAX_LOG_PAGE_SIZE = 100
)

type HealthResponse struct {
Expand Down Expand Up @@ -124,6 +125,7 @@ func NewAPI(cfg Config) (*API, error) {
if v, ok := cfg.Enabled["jobs"]; !ok || v {
r.POST("/jobs", s.createJob)
r.GET("/jobs/:id", s.getJob)
r.GET("/jobs/:id/log", s.getJobLog)
r.GET("/jobs", s.listJobs)
r.PUT("/jobs/:id/cancel", s.cancelJob)
r.PUT("/jobs/:id/restart", s.restartJob)
Expand Down Expand Up @@ -326,6 +328,52 @@ func (s *API) getJob(c echo.Context) error {
return c.JSON(http.StatusOK, j)
}

// getJobLog
// @Summary Get a jobs's log
// @Tags jobs
// @Produce application/json
// @Success 200 {object} []tork.TaskLogPart
// @Router /jobs/{id}/log [get]
// @Param id path string true "Job ID"
// @Param page query int false "page number"
// @Param size query int false "page size"
func (s *API) getJobLog(c echo.Context) error {
id := c.Param("id")
ps := c.QueryParam("page")
if ps == "" {
ps = "1"
}
page, err := strconv.Atoi(ps)
if err != nil {
return echo.NewHTTPError(http.StatusNotFound, fmt.Sprintf("invalid page number: %s", ps))
}
if page < 1 {
page = 1
}
si := c.QueryParam("size")
if si == "" {
si = "25"
}
size, err := strconv.Atoi(si)
if err != nil {
return echo.NewHTTPError(http.StatusNotFound, fmt.Sprintf("invalid size: %s", ps))
}
if size < 1 {
size = 1
} else if size > MAX_LOG_PAGE_SIZE {
size = MAX_LOG_PAGE_SIZE
}
_, err = s.ds.GetJobByID(c.Request().Context(), id)
if err != nil {
return echo.NewHTTPError(http.StatusNotFound, err.Error())
}
l, err := s.ds.GetJobLogParts(c.Request().Context(), id, page, size)
if err != nil {
return echo.NewHTTPError(http.StatusNotFound, err.Error())
}
return c.JSON(http.StatusOK, l)
}

// listJobs
// @Summary Show a list of jobs
// @Tags jobs
Expand Down Expand Up @@ -425,8 +473,8 @@ func (s *API) getTaskLog(c echo.Context) error {
}
if size < 1 {
size = 1
} else if size > 50 {
size = 50
} else if size > MAX_LOG_PAGE_SIZE {
size = MAX_LOG_PAGE_SIZE
}
_, err = s.ds.GetTaskByID(c.Request().Context(), id)
if err != nil {
Expand Down

0 comments on commit a3fd37d

Please sign in to comment.