Skip to content

Commit

Permalink
[ADDED] ListKeysFiltered form listing KV keys with multiple filters (#…
Browse files Browse the repository at this point in the history
…1711)

Co-authored-by: somratdutta <duttasomratand.com>
  • Loading branch information
somratdutta authored Dec 27, 2024
1 parent 6977981 commit ecb328a
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 0 deletions.
33 changes: 33 additions & 0 deletions jetstream/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ type (
// the key value store in a streaming fashion (on a channel).
ListKeys(ctx context.Context, opts ...WatchOpt) (KeyLister, error)

// ListKeysFiltered ListKeysWithFilters returns a KeyLister for filtered keys in the bucket.
ListKeysFiltered(ctx context.Context, filters ...string) (KeyLister, error)

// History will return all historical values for the key (up to
// KeyValueMaxHistory).
History(ctx context.Context, key string, opts ...WatchOpt) ([]KeyValueEntry, error)
Expand Down Expand Up @@ -1271,6 +1274,36 @@ func (kv *kvs) ListKeys(ctx context.Context, opts ...WatchOpt) (KeyLister, error
return kl, nil
}

// ListKeysWithFilters returns a channel of keys matching the provided filters using WatchFiltered.
func (kv *kvs) ListKeysFiltered(ctx context.Context, filters ...string) (KeyLister, error) {
watcher, err := kv.WatchFiltered(ctx, filters)
if err != nil {
return nil, err
}

// Reuse the existing keyLister implementation
kl := &keyLister{watcher: watcher, keys: make(chan string, 256)}

go func() {
defer close(kl.keys)
defer watcher.Stop()

for {
select {
case entry := <-watcher.Updates():
if entry == nil { // Indicates all initial values are received
return
}
kl.keys <- entry.Key()
case <-ctx.Done():
return
}
}
}()

return kl, nil
}

func (kl *keyLister) Keys() <-chan string {
return kl.keys
}
Expand Down
62 changes: 62 additions & 0 deletions jetstream/test/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,68 @@ func TestKeyValueListKeys(t *testing.T) {
}
}

func TestListKeysFiltered(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Create Key-Value store.
kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "KVS", History: 2})
expectOk(t, err)

// Helper function to add key-value pairs.
putKeys := func(data map[string]string) {
for key, value := range data {
t.Helper()
_, err := kv.Put(ctx, key, []byte(value))
expectOk(t, err)
}
}

// Add key-value pairs.
putKeys(map[string]string{
"apple": "fruit",
"banana": "fruit",
"carrot": "vegetable",
})

// Use filters to list keys matching "apple".
filters := []string{"apple"}
keyLister, err := kv.ListKeysFiltered(ctx, filters...)
expectOk(t, err)

// Collect filtered keys from KeyLister
var filteredKeys []string
for key := range keyLister.Keys() {
filteredKeys = append(filteredKeys, key)
}

// Validate expected keys.
expectedKeys := []string{"apple"}
if len(filteredKeys) != len(expectedKeys) {
t.Fatalf("Expected %d filtered key(s), got %d", len(expectedKeys), len(filteredKeys))
}

for _, key := range expectedKeys {
if !contains(filteredKeys, key) {
t.Fatalf("Expected key %s in filtered keys, but not found", key)
}
}
}

func contains(slice []string, key string) bool {
for _, k := range slice {
if k == key {
return true
}
}
return false
}

func TestKeyValueCrossAccounts(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
Expand Down

0 comments on commit ecb328a

Please sign in to comment.