Skip to content

Commit

Permalink
Merge pull request #69 from replicase/fix/fix-debounce-loss-message
Browse files Browse the repository at this point in the history
fix: fix loss message problem with debounce consumer handler with requeue behavior
  • Loading branch information
KennyChenFight authored Sep 16, 2024
2 parents cb4e454 + bc8c3f2 commit 98cec22
Showing 1 changed file with 24 additions and 5 deletions.
29 changes: 24 additions & 5 deletions pkg/pgcapture/debounce.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,26 +96,45 @@ func (b *DebounceHandler) Handle(fn ModelAsyncHandlerFunc, checkpoint cursor.Che

switch change.Op {
case pb.Change_INSERT:
if prev, ok := b.store[debounceKey(change.New)]; ok {
key := debounceKey(change.New)
if prev, ok := b.store[key]; ok {
b.handle(prev)
delete(b.store, key)
}
b.handle(e)
case pb.Change_DELETE:
if prev, ok := b.store[debounceKey(change.Old)]; ok {
key := debounceKey(change.Old)
if prev, ok := b.store[key]; ok {
b.handle(prev)
delete(b.store, key)
}
b.handle(e)
case pb.Change_UPDATE:
if change.Old != nil {
if prev, ok := b.store[debounceKey(change.Old)]; ok {
key := debounceKey(change.Old)
if prev, ok := b.store[key]; ok {
b.handle(prev)
delete(b.store, key)
}
}
key := debounceKey(change.New)
if prev, ok := b.store[key]; ok {
b.source.Commit(prev.Checkpoint)
// since requeue order is not guaranteed, we need to check if the new event is newer than the previous one
// then we commit the previous one and store the new one
// workaround for the LSN == 0 issue because schedule dump lsn is 0 and should be always the latest event
// also, when the checkpoint is equal, we cannot commit the previous event because it might be a same event
if change.Checkpoint.LSN == 0 || change.Checkpoint.After(prev.Checkpoint) {
b.source.Commit(prev.Checkpoint)
b.store[key] = e
} else if change.Checkpoint.Equal(prev.Checkpoint) {
b.handle(prev)
b.store[key] = e
} else {
b.source.Commit(change.Checkpoint)
}
} else {
b.store[key] = e
}
b.store[key] = e
}
}

Expand Down

0 comments on commit 98cec22

Please sign in to comment.