-
Notifications
You must be signed in to change notification settings - Fork 322
/
Copy patheventorder_debugger_test.go
82 lines (71 loc) · 3.16 KB
/
eventorder_debugger_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package router
import (
"context"
"testing"
"time"
"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/require"
"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/router/internal/eventorder"
migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
)
func TestEventOrderDebugInfo(t *testing.T) {
pool, err := dockertest.NewPool("")
require.NoError(t, err)
pgContainer, err := postgres.Setup(pool, t, postgres.WithShmSize(256*bytesize.MB))
require.NoError(t, err)
m := &migrator.Migrator{
Handle: pgContainer.DB,
MigrationsTable: "node_migrations",
ShouldForceSetLowerVersion: config.GetBool("SQLMigrator.forceSetLowerVersion", true),
}
require.NoError(t, m.Migrate("node"))
jdb := jobsdb.NewForReadWrite("rt", jobsdb.WithDBHandle(pgContainer.DB))
require.NoError(t, jdb.Start())
defer jdb.Stop()
err = jdb.Store(context.Background(), []*jobsdb.JobT{{
UserID: "user1",
WorkspaceId: "workspace1",
Parameters: []byte(`{"destination_id": "destination1"}`),
EventCount: 1,
EventPayload: []byte(`{"type": "track", "event": "test_event", "properties": {"key": "value"}}`),
CustomVal: "dummy",
}})
require.NoError(t, err)
jobs, err := jdb.GetJobs(context.Background(), []string{jobsdb.Unprocessed.State}, jobsdb.GetQueryParams{JobsLimit: 1})
require.NoError(t, err)
require.Len(t, jobs.Jobs, 1)
job := jobs.Jobs[0]
require.NoError(t, jdb.UpdateJobStatus(context.Background(), []*jobsdb.JobStatusT{{
WorkspaceId: "workspace1",
JobID: job.JobID,
JobState: jobsdb.Executing.State,
ErrorResponse: []byte("{}"),
Parameters: []byte(`{}`),
JobParameters: []byte(`{"destination_id", "destination1"}`),
}}, nil, nil))
require.NoError(t, jdb.UpdateJobStatus(context.Background(), []*jobsdb.JobStatusT{{
WorkspaceId: "workspace1",
JobID: job.JobID,
JobState: jobsdb.Succeeded.State,
ErrorResponse: []byte("{}"),
Parameters: []byte(`{}`),
JobParameters: []byte(`{"destination_id", "destination1"}`),
}}, nil, nil))
rt := &Handle{
jobsDB: jdb,
}
refTime := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC)
_, err = pgContainer.DB.Exec("UPDATE rt_jobs_1 SET created_at = $1", refTime)
require.NoError(t, err)
debugInfo := rt.eventOrderDebugInfo(eventorder.BarrierKey{UserID: "user1", DestinationID: "destination1"})
require.Equal(t,
` | t_name| job_id| created_at| status_id| job_state| attempt| exec_time| error_code| parameters| error_response|
| ---| ---| ---| ---| ---| ---| ---| ---| ---| ---|
| rt_jobs_1| 1| 2023-01-01 00:00:00 +0000 UTC| 1| executing| 0| 0001-01-01 00:00:00 +0000 UTC| | {}| {}|
| rt_jobs_1| 1| 2023-01-01 00:00:00 +0000 UTC| 2| succeeded| 0| 0001-01-01 00:00:00 +0000 UTC| | {}| {}|
`, debugInfo)
}