Skip to content

Commit

Permalink
give correct publisher metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
mYmNeo committed Jun 19, 2023
1 parent c903578 commit b7499aa
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 1 deletion.
3 changes: 2 additions & 1 deletion publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ func (p *publisher) publishUpdates(reqs requests) {
kv := &pb.KV{
Key: y.ParseKey(k),
Value: y.SafeCopy(nil, e.Value),
Meta: []byte{e.UserMeta},
UserMeta: []byte{e.UserMeta},
Meta: []byte{e.meta},
ExpiresAt: e.ExpiresAt,
Version: y.ParseTs(k),
}
Expand Down
54 changes: 54 additions & 0 deletions publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,57 @@ func TestMultiplePrefix(t *testing.T) {
wg.Wait()
})
}

func TestPublisherMetaCheck(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
userBytes := make([]byte, 0)
metaBytes := make([]byte, 0)
var wg sync.WaitGroup
wg.Add(1)
var subWg sync.WaitGroup
subWg.Add(1)
go func() {
subWg.Done()
updates := 0
match := pb.Match{Prefix: []byte("ke"), IgnoreBytes: ""}
err := db.Subscribe(context.Background(), func(kvs *pb.KVList) error {
for _, kv := range kvs.GetKv() {
if len(kv.UserMeta) > 0 {
userBytes = append(userBytes, kv.UserMeta[0])
}
if len(kv.Meta) > 0 {
metaBytes = append(metaBytes, kv.Meta[0])
}
updates++
}
if updates == 10 {
wg.Done()
}
return nil
}, []pb.Match{match})
if err != nil {
require.Equal(t, err.Error(), context.Canceled.Error())
}
}()
subWg.Wait()
for i := 0; i < 5; i++ {
require.NoError(t, db.Update(func(txn *Txn) error {
e := NewEntry([]byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("value%d", i)))
e = e.WithMeta(byte(i))
return txn.SetEntry(e)
}))
}
for i := 0; i < 5; i++ {
require.NoError(t, db.Update(func(txn *Txn) error {
return txn.Delete([]byte(fmt.Sprintf("key%d", i)))
}))
}
wg.Wait()
for i := 0; i < 5; i++ {
require.Equal(t, byte(i), userBytes[i])
}
for i := 5; i < 10; i++ {
require.Equal(t, bitDelete, metaBytes[i]&bitDelete)
}
})
}

0 comments on commit b7499aa

Please sign in to comment.