Skip to content

Commit

Permalink
Filter replicated databases (#407)
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson authored Sep 29, 2023
1 parent 672c7ea commit dce966e
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 11 deletions.
3 changes: 2 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ type Client interface {
Commit(ctx context.Context, primaryURL string, nodeID uint64, name string, lockID int64, r io.Reader) error

// Stream starts a long-running connection to stream changes from another node.
Stream(ctx context.Context, primaryURL string, nodeID uint64, posMap map[string]ltx.Pos) (Stream, error)
// If filter is specified, only those databases will be replicated.
Stream(ctx context.Context, primaryURL string, nodeID uint64, posMap map[string]ltx.Pos, filter []string) (Stream, error)
}

// Stream represents a stream of frames.
Expand Down
3 changes: 3 additions & 0 deletions cmd/litefs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ type LeaseConfig struct {
// become primary again.
DemoteDelay time.Duration `yaml:"demote-delay"`

// Specifies a subset of databases to replica.
Databases []string `yaml:"databases"`

// Consul lease settings.
Consul struct {
URL string `yaml:"url"`
Expand Down
11 changes: 11 additions & 0 deletions cmd/litefs/mount_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type MountCommand struct {

// Used for generating the advertise URL for testing.
AdvertiseURLFn func() string

OnInitStore func()
}

// NewMountCommand returns a new instance of MountCommand.
Expand Down Expand Up @@ -158,6 +160,10 @@ func (c *MountCommand) Validate(ctx context.Context) (err error) {
return fmt.Errorf("invalid lease type, must be either 'consul' or 'static', got: '%v'", c.Config.Lease.Type)
}

if c.Config.Lease.Candidate && len(c.Config.Lease.Databases) > 0 {
return fmt.Errorf("cannot specify a database replication filter on candidate nodes")
}

return nil
}

Expand Down Expand Up @@ -362,8 +368,13 @@ func (c *MountCommand) initStore(ctx context.Context) error {
c.Store.ReconnectDelay = c.Config.Lease.ReconnectDelay
c.Store.DemoteDelay = c.Config.Lease.DemoteDelay
c.Store.Client = http.NewClient()
c.Store.DatabaseFilter = c.Config.Lease.Databases
c.initEnvironment(ctx)

if c.OnInitStore != nil {
c.OnInitStore()
}

if err := c.initStoreBackupClient(ctx); err != nil {
return err
}
Expand Down
34 changes: 34 additions & 0 deletions cmd/litefs/mount_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1935,6 +1935,40 @@ func TestMultiNode_Autopromotion(t *testing.T) {
}
}

func TestMultiNode_DatabaseFilter(t *testing.T) {
cmd0 := runMountCommand(t, newMountCommand(t, t.TempDir(), nil))
waitForPrimary(t, cmd0)
cmd1 := newMountCommand(t, t.TempDir(), cmd0)
cmd1.OnInitStore = func() {
cmd1.Store.DatabaseFilter = []string{"x.db"}
}
runMountCommand(t, cmd1)

db0 := testingutil.OpenSQLDB(t, filepath.Join(cmd0.Config.FUSE.Dir, "x.db"))
if _, err := db0.Exec(`CREATE TABLE t (x)`); err != nil {
t.Fatal(err)
} else if err := db0.Close(); err != nil {
t.Fatal(err)
}

db1 := testingutil.OpenSQLDB(t, filepath.Join(cmd0.Config.FUSE.Dir, "y.db"))
if _, err := db1.Exec(`CREATE TABLE t (y)`); err != nil {
t.Fatal(err)
} else if err := db1.Close(); err != nil {
t.Fatal(err)
}

waitForSync(t, "x.db", cmd0, cmd1)

// Only the filtered database should exist.
if _, err := os.Stat(filepath.Join(cmd1.Config.FUSE.Dir, "x.db")); err != nil {
t.Fatal(err)
}
if _, err := os.Stat(filepath.Join(cmd1.Config.FUSE.Dir, "y.db")); !os.IsNotExist(err) {
t.Fatal("expected second database to not exist on replica")
}
}

func TestMultiNode_StaticLeaser(t *testing.T) {
dir0, dir1 := t.TempDir(), t.TempDir()
cmd0 := newMountCommand(t, dir0, nil)
Expand Down
15 changes: 11 additions & 4 deletions http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/http"
"net/url"
"strconv"
"strings"

"github.com/superfly/litefs"
"github.com/superfly/litefs/internal/chunk"
Expand Down Expand Up @@ -354,7 +355,7 @@ func (c *Client) Commit(ctx context.Context, primaryURL string, nodeID uint64, n
}

// Stream returns a snapshot and continuous stream of WAL updates.
func (c *Client) Stream(ctx context.Context, primaryURL string, nodeID uint64, posMap map[string]ltx.Pos) (litefs.Stream, error) {
func (c *Client) Stream(ctx context.Context, primaryURL string, nodeID uint64, posMap map[string]ltx.Pos, filter []string) (litefs.Stream, error) {
u, err := url.Parse(primaryURL)
if err != nil {
return nil, fmt.Errorf("invalid client URL: %w", err)
Expand All @@ -364,11 +365,17 @@ func (c *Client) Stream(ctx context.Context, primaryURL string, nodeID uint64, p
return nil, fmt.Errorf("URL host required")
}

q := make(url.Values)
if len(filter) > 0 {
q.Set("filter", strings.Join(filter, ","))
}

// Strip off everything but the scheme & host.
*u = url.URL{
Scheme: u.Scheme,
Host: u.Host,
Path: "/stream",
Scheme: u.Scheme,
Host: u.Host,
Path: "/stream",
RawQuery: q.Encode(),
}

var buf bytes.Buffer
Expand Down
18 changes: 18 additions & 0 deletions http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ func (s *Server) handlePostStream(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Upgrade to HTTP/2 required", http.StatusUpgradeRequired)
return
}
q := r.URL.Query()

// Prevent nodes from connecting to themselves.
id, _ := litefs.ParseNodeID(r.Header.Get(HeaderNodeID))
Expand Down Expand Up @@ -536,6 +537,14 @@ func (s *Server) handlePostStream(w http.ResponseWriter, r *http.Request) {
dirtySet[db.Name()] = struct{}{}
}

// Determine filtered set of databases, if any.
filterSet := make(map[string]struct{})
if filter := q.Get("filter"); filter != "" {
for _, name := range strings.Split(filter, ",") {
filterSet[name] = struct{}{}
}
}

// Flush header so client can resume control.
w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()
Expand All @@ -554,6 +563,15 @@ func (s *Server) handlePostStream(w http.ResponseWriter, r *http.Request) {
var readySent bool
var handoffLeaseID string
for {
// Restrict dirty set to only databases in the filter set.
if len(filterSet) > 0 && len(dirtySet) > 0 {
for name := range dirtySet {
if _, ok := filterSet[name]; !ok {
delete(dirtySet, name)
}
}
}

// Send pending transactions for each database.
for name := range dirtySet {
if err := s.streamDB(r.Context(), w, name, posMap); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions mock/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Client struct {
AcquireHaltLockFunc func(ctx context.Context, primaryURL string, nodeID uint64, name string, lockID int64) (*litefs.HaltLock, error)
ReleaseHaltLockFunc func(ctx context.Context, primaryURL string, nodeID uint64, name string, lockID int64) error
CommitFunc func(ctx context.Context, primaryURL string, nodeID uint64, name string, lockID int64, r io.Reader) error
StreamFunc func(ctx context.Context, primaryURL string, nodeID uint64, posMap map[string]ltx.Pos) (litefs.Stream, error)
StreamFunc func(ctx context.Context, primaryURL string, nodeID uint64, posMap map[string]ltx.Pos, filter []string) (litefs.Stream, error)
}

func (c *Client) AcquireHaltLock(ctx context.Context, primaryURL string, nodeID uint64, name string, lockID int64) (*litefs.HaltLock, error) {
Expand All @@ -29,8 +29,8 @@ func (c *Client) Commit(ctx context.Context, primaryURL string, nodeID uint64, n
return c.CommitFunc(ctx, primaryURL, nodeID, name, lockID, r)
}

func (c *Client) Stream(ctx context.Context, primaryURL string, nodeID uint64, posMap map[string]ltx.Pos) (litefs.Stream, error) {
return c.StreamFunc(ctx, primaryURL, nodeID, posMap)
func (c *Client) Stream(ctx context.Context, primaryURL string, nodeID uint64, posMap map[string]ltx.Pos, filter []string) (litefs.Stream, error) {
return c.StreamFunc(ctx, primaryURL, nodeID, posMap, filter)
}

type Stream struct {
Expand Down
5 changes: 4 additions & 1 deletion store.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ type Store struct {
// Interface to interact with the host environment.
Environment Environment

// Specifies a subset of databases to replicate from the primary.
DatabaseFilter []string

// If true, computes and verifies the checksum of the entire database
// after every transaction. Should only be used during testing.
StrictVerify bool
Expand Down Expand Up @@ -1357,7 +1360,7 @@ func (s *Store) monitorLeaseAsReplica(ctx context.Context, info PrimaryInfo) (ha
}()

posMap := s.PosMap()
st, err := s.Client.Stream(ctx, info.AdvertiseURL, s.id, posMap)
st, err := s.Client.Stream(ctx, info.AdvertiseURL, s.id, posMap, s.DatabaseFilter)
if err != nil {
return "", fmt.Errorf("connect to primary: %s ('%s')", err, info.AdvertiseURL)
}
Expand Down
4 changes: 2 additions & 2 deletions store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func TestStore_PrimaryCtx(t *testing.T) {
}

client := mock.Client{
StreamFunc: func(ctx context.Context, rawurl string, nodeID uint64, posMap map[string]ltx.Pos) (litefs.Stream, error) {
StreamFunc: func(ctx context.Context, rawurl string, nodeID uint64, posMap map[string]ltx.Pos, filter []string) (litefs.Stream, error) {
return &mock.Stream{
ReadCloser: io.NopCloser(&bytes.Buffer{}),
ClusterIDFunc: func() string { return "" },
Expand Down Expand Up @@ -253,7 +253,7 @@ func TestStore_PrimaryCtx(t *testing.T) {
t.Run("InitialReplica", func(t *testing.T) {
leaser := litefs.NewStaticLeaser(false, "localhost", "http://localhost:20202")
client := mock.Client{
StreamFunc: func(ctx context.Context, rawurl string, nodeID uint64, posMap map[string]ltx.Pos) (litefs.Stream, error) {
StreamFunc: func(ctx context.Context, rawurl string, nodeID uint64, posMap map[string]ltx.Pos, filter []string) (litefs.Stream, error) {
var buf bytes.Buffer
if err := litefs.WriteStreamFrame(&buf, &litefs.ReadyStreamFrame{}); err != nil {
return nil, err
Expand Down

0 comments on commit dce966e

Please sign in to comment.