Skip to content

Commit

Permalink
Rework crdb changefeed input
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Feb 8, 2024
1 parent ab0a2bc commit 489346d
Show file tree
Hide file tree
Showing 12 changed files with 984 additions and 458 deletions.
15 changes: 12 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ require (
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c
github.com/itchyny/gojq v0.12.14
github.com/itchyny/timefmt-go v0.1.5
github.com/jackc/pgx/v4 v4.18.1
github.com/jhump/protoreflect v1.15.4
github.com/jmespath/go-jmespath v0.4.0
github.com/klauspost/compress v1.17.4
Expand Down Expand Up @@ -121,7 +122,7 @@ require (
go.opentelemetry.io/otel/sdk v1.22.0
go.opentelemetry.io/otel/trace v1.22.0
go.uber.org/multierr v1.11.0
golang.org/x/crypto v0.18.0
golang.org/x/crypto v0.19.0
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
golang.org/x/net v0.20.0
golang.org/x/oauth2 v0.16.0
Expand Down Expand Up @@ -232,6 +233,14 @@ require (
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/imdario/mergo v0.3.13 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.14.1 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.2 // indirect
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect
github.com/jackc/pgtype v1.14.2 // indirect
github.com/jackc/puddle v1.3.0 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
Expand Down Expand Up @@ -294,8 +303,8 @@ require (
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/term v0.16.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/term v0.17.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.16.1 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
Expand Down
120 changes: 114 additions & 6 deletions go.sum

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/benthosdev/benthos/v4/public/service"
)

func TestCRDBConfigParse(t *testing.T) {
conf := `
crdb_changefeed:
cockroach_changefeed:
dsn: postgresql://dan:[email protected]:26257/defaultdb?sslmode=require&options=--cluster%3Dportly-impala-2852
tables:
- strm_2
Expand All @@ -26,7 +27,9 @@ options:
selectConfig, err := spec.ParseYAML(conf, env)
require.NoError(t, err)

selectInput, err := newCRDBChangefeedInputFromConfig(selectConfig, nil)
selectInput, err := newCRDBChangefeedInputFromConfig(selectConfig, service.MockResources())
require.NoError(t, err)

assert.Equal(t, "EXPERIMENTAL CHANGEFEED FOR strm_2 WITH UPDATED, CURSOR='1637953249519902405.0000000000'", selectInput.statement)
require.NoError(t, selectInput.Close(context.Background()))
}
134 changes: 134 additions & 0 deletions internal/impl/cockroachdb/exploration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package crdb_test

import (
"context"
"database/sql"
"fmt"
"testing"
"time"

"github.com/Jeffail/gabs/v2"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

_ "github.com/lib/pq"

"github.com/benthosdev/benthos/v4/internal/integration"
_ "github.com/benthosdev/benthos/v4/public/components/io"
_ "github.com/benthosdev/benthos/v4/public/components/pure"
)

func TestIntegrationExploration(t *testing.T) {
integration.CheckSkip(t)
t.Parallel()

pool, err := dockertest.NewPool("")
require.NoError(t, err)

pool.MaxWait = time.Second * 30
resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "cockroachdb/cockroach",
Tag: "latest",
Cmd: []string{"start-single-node", "--insecure"},
ExposedPorts: []string{"8080", "26257"},
})
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, pool.Purge(resource))
})

port := resource.GetPort("26257/tcp")
dsn := fmt.Sprintf("postgres://root@localhost:%v/defaultdb?sslmode=disable", port)

var pgpool *pgxpool.Pool
require.NoError(t, resource.Expire(900))

require.NoError(t, pool.Retry(func() error {
if pgpool == nil {
if pgpool, err = pgxpool.Connect(context.Background(), dsn); err != nil {
return err
}
}
// Enable changefeeds
if _, err = pgpool.Exec(context.Background(), "SET CLUSTER SETTING kv.rangefeed.enabled = true;"); err != nil {
return err
}
// Create table
_, err = pgpool.Exec(context.Background(), "CREATE TABLE foo (a INT PRIMARY KEY);")
return err
}))
t.Cleanup(func() {
pgpool.Close()
})

cfdb, err := sql.Open("postgres", dsn)
require.NoError(t, err)

// Create a backlog of rows
i := 0
for ; i < 100; i++ {
// Insert some rows
if _, err = pgpool.Exec(context.Background(), fmt.Sprintf("INSERT INTO foo VALUES (%v);", i)); err != nil {
return
}
}

rowsCtx, done := context.WithCancel(context.Background())

rows, err := cfdb.QueryContext(rowsCtx, "EXPERIMENTAL CHANGEFEED FOR foo WITH UPDATED")
require.NoError(t, err)

var latestCursor string
for j := 0; j < 100; j++ {
require.True(t, rows.Next())

var a, b, c []byte
require.NoError(t, rows.Scan(&a, &b, &c))

gObj, err := gabs.ParseJSON(c)
require.NoError(t, err)

latestCursor, _ = gObj.S("updated").Data().(string)
assert.Equal(t, float64(j), gObj.S("after", "a").Data(), gObj.String())
}

done()

cfdb.Close()
rows.Close()

// Insert some more rows
for ; i < 150; i++ {
if _, err = pgpool.Exec(context.Background(), fmt.Sprintf("INSERT INTO foo VALUES (%v);", i)); err != nil {
t.Error(err)
}
}

// Create a new changefeed with a cursor set to the latest updated value
cfdb, err = sql.Open("postgres", dsn)
require.NoError(t, err)

rowsCtx, done = context.WithCancel(context.Background())

rows, err = cfdb.QueryContext(rowsCtx, "EXPERIMENTAL CHANGEFEED FOR foo WITH UPDATED, CURSOR=\""+latestCursor+"\"")
require.NoError(t, err)

for j := 0; j < 50; j++ {
require.True(t, rows.Next())

var a, b, c []byte
require.NoError(t, rows.Scan(&a, &b, &c))

gObj, err := gabs.ParseJSON(c)
require.NoError(t, err)

assert.Equal(t, float64(j+100), gObj.S("after", "a").Data(), gObj.String())
}

done()

cfdb.Close()
rows.Close()
}
Loading

0 comments on commit 489346d

Please sign in to comment.