Skip to content

Commit

Permalink
Merge pull request #33 from air-go/develop
Browse files Browse the repository at this point in the history
add bloom filter
  • Loading branch information
why444216978 authored Aug 30, 2024
2 parents ee0dd3a + a19d152 commit 08a6917
Show file tree
Hide file tree
Showing 7 changed files with 502 additions and 0 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ require (
github.com/allegro/bigcache/v3 v3.1.0 // indirect
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.10.0 // indirect
github.com/bits-and-blooms/bloom/v3 v3.7.0 // indirect
github.com/bwmarrin/snowflake v0.3.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
Expand Down Expand Up @@ -130,6 +132,7 @@ require (
github.com/rivo/uniseg v0.2.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/smartystreets/assertions v1.2.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bits-and-blooms/bitset v1.10.0 h1:ePXTeiPEazB5+opbv5fr8umg2R/1NlzgDsyepwsSr88=
github.com/bits-and-blooms/bitset v1.10.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bits-and-blooms/bloom/v3 v3.7.0 h1:VfknkqV4xI+PsaDIsoHueyxVDZrfvMn56jeWUzvzdls=
github.com/bits-and-blooms/bloom/v3 v3.7.0/go.mod h1:VKlUSvp0lFIYqxJjzdnSsZEw4iHb1kOL2tfHTgyJBHg=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0=
github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE=
Expand Down Expand Up @@ -529,6 +533,8 @@ github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4k
github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js=
github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/afero v1.8.2 h1:xehSyVa0YnHWsJ49JFljMpg1HX19V6NDZ1fkm1Xznbo=
github.com/spf13/afero v1.8.2/go.mod h1:CtAatgMJh6bJEIs48Ay/FOnkljP3WeGUG0MC1RfAqwo=
Expand Down Expand Up @@ -568,6 +574,7 @@ github.com/tevid/gohamcrest v1.1.1/go.mod h1:3UvtWlqm8j5JbwYZh80D/PVBt0mJ1eJiYgZ
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/turtlemonvh/gin-wraphh v0.0.0-20160304035037-ea8e4927b3a6 h1:LXn2Epw3+6TJxFwYbUo0M32riVbVfSTqCf41kJCmLb8=
github.com/turtlemonvh/gin-wraphh v0.0.0-20160304035037-ea8e4927b3a6/go.mod h1:9ACh8xdjPLhq9E8Rp80s6QqYzuAAx75ZNPLnjgtei6Q=
github.com/twmb/murmur3 v1.1.6/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/uber/jaeger-client-go v2.25.0+incompatible h1:IxcNZ7WRY1Y3G4poYlx24szfsn/3LvK9QHCq9oQw8+U=
github.com/uber/jaeger-client-go v2.25.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.4.0+incompatible h1:fY7QsGQWiCt8pajv4r7JEvmATdCVaWxXbjwyYwsNaLQ=
Expand Down
14 changes: 14 additions & 0 deletions library/bloom/bloom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package bloom

import (
"context"
"time"
)

type Bloom interface {
Add(ctx context.Context, key string, data []byte, ttl time.Duration) error
// Check is return whether it exists or not
Check(ctx context.Context, key string, data []byte, ttl time.Duration) (bool, error)
// CheckAndAdd is return whether it exists, if not exists add.
CheckAndAdd(ctx context.Context, key string, data []byte, ttl time.Duration) (bool, error)
}
162 changes: 162 additions & 0 deletions library/bloom/localbloom/local_bloom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package localbloom

import (
"context"
"sync"
"time"

"github.com/benbjohnson/clock"
"github.com/bits-and-blooms/bloom/v3"
"github.com/why444216978/go-util/assert"
"github.com/why444216978/go-util/nopanic"

lbloom "github.com/air-go/rpc/library/bloom"
)

type options struct {
n uint
fp float64
clock clock.Clock
releaseDuration time.Duration
}

func defaultOptions() *options {
return &options{
n: 10000,
fp: 0.01,
releaseDuration: time.Minute,
}
}

type OptionFunc func(*options)

func SetEstimateParameters(n uint, fp float64) OptionFunc {
return func(o *options) {
o.n = n
o.fp = fp
}
}

func SetClock(c clock.Clock) OptionFunc {
return func(o *options) { o.clock = c }
}

func SetReleaseDuration(d time.Duration) OptionFunc {
return func(o *options) { o.releaseDuration = d }
}

type MemoryBloom struct {
*options
blooms sync.Map
}

var _ lbloom.Bloom = (*MemoryBloom)(nil)

func NewMemoryBloom(opts ...OptionFunc) *MemoryBloom {
opt := defaultOptions()
for _, o := range opts {
o(opt)
}

mb := &MemoryBloom{
options: opt,
blooms: sync.Map{},
}
mb.tryRelease()

return mb
}

func (mb *MemoryBloom) Add(ctx context.Context, key string, data []byte, ttl time.Duration) error {
mb.getBloom(key).add(data, mb.now(), ttl)
return nil
}

func (mb *MemoryBloom) Check(ctx context.Context, key string, data []byte, ttl time.Duration) (bool, error) {
return mb.getBloom(key).check(data, mb.now(), ttl), nil
}

func (mb *MemoryBloom) CheckAndAdd(ctx context.Context, key string, data []byte, ttl time.Duration) (bool, error) {
return mb.getBloom(key).checkAndAdd(data, mb.now(), ttl), nil
}

func (mb *MemoryBloom) getBloom(k string) *keyBloom {
v, ok := mb.blooms.Load(k)
if ok {
return v.(*keyBloom)
}

b := newKeyBloom(k, mb.n, mb.fp)
mb.blooms.Store(k, b)
return b
}

func (mb *MemoryBloom) tryRelease() {
go nopanic.GoVoid(context.Background(), func() {
t := time.NewTicker(mb.releaseDuration)
for range t.C {
mb.blooms.Range(func(k, v any) bool {
if mb.now().After(v.(*keyBloom).getExpireAt()) {
mb.blooms.Delete(k)
}
return true
})
}
})
}

func (mb *MemoryBloom) now() time.Time {
if assert.IsNil(mb.clock) {
return time.Now()
}
return mb.clock.Now()
}

type keyBloom struct {
mu sync.Mutex
k string
n uint
fp float64
b *bloom.BloomFilter
expireAt time.Time
}

func newKeyBloom(k string, n uint, fp float64) *keyBloom {
return &keyBloom{
k: k,
n: n,
fp: fp,
b: bloom.NewWithEstimates(n, fp),
}
}

func (b *keyBloom) add(data []byte, now time.Time, ttl time.Duration) {
b.mu.Lock()
defer b.mu.Unlock()

b.expireAt = now.Add(ttl)
b.b.Add(data)
}

func (b *keyBloom) check(data []byte, now time.Time, ttl time.Duration) bool {
b.mu.Lock()
defer b.mu.Unlock()

b.expireAt = now.Add(ttl)
return b.b.Test(data)
}

func (b *keyBloom) checkAndAdd(data []byte, now time.Time, ttl time.Duration) bool {
b.mu.Lock()
defer b.mu.Unlock()

b.expireAt = now.Add(ttl)
return b.b.TestAndAdd(data)
}

func (b *keyBloom) getExpireAt() time.Time {
b.mu.Lock()
t := b.expireAt
b.mu.Unlock()
return t
}
51 changes: 51 additions & 0 deletions library/bloom/localbloom/local_bloom_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package localbloom

import (
"context"
"testing"
"time"

"github.com/benbjohnson/clock"
"github.com/stretchr/testify/assert"
)

func TestMemoryBloom(t *testing.T) {
ctx := context.Background()

c := clock.NewMock()
c.Set(time.Now())

b := NewMemoryBloom(
SetEstimateParameters(10000, 0.01),
SetClock(c),
SetReleaseDuration(time.Nanosecond),
)
key := "key"

ok, err := b.Check(ctx, key, []byte("abc"), time.Millisecond)
assert.Nil(t, err)
assert.Equal(t, false, ok)

err = b.Add(ctx, key, []byte("abc"), time.Millisecond)
assert.Nil(t, err)

ok, err = b.Check(ctx, key, []byte("abc"), time.Millisecond)
assert.Nil(t, err)
assert.Equal(t, true, ok)

ok, err = b.CheckAndAdd(ctx, key, []byte("abcd"), time.Millisecond)
assert.Nil(t, err)
assert.Equal(t, false, ok)

ok, err = b.Check(ctx, key, []byte("abcd"), time.Millisecond)
assert.Nil(t, err)
assert.Equal(t, true, ok)

c.Add(time.Minute * 2)
ok, err = b.Check(ctx, key, []byte("abc"), time.Millisecond)
assert.Nil(t, err)
assert.Equal(t, false, ok)
ok, err = b.Check(ctx, key, []byte("abcd"), time.Millisecond)
assert.Nil(t, err)
assert.Equal(t, false, ok)
}
Loading

0 comments on commit 08a6917

Please sign in to comment.