Skip to content

Commit

Permalink
eth/filters: ✨ pending logs ✨
Browse files Browse the repository at this point in the history
Pending logs are now filterable through the Go API. Filter API changed
such that each filter type has it's own bucket and adding filter
explicitly requires you specify the bucket to put it in.
  • Loading branch information
obscuren committed Feb 13, 2016
1 parent b05e472 commit 987c1a5
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 52 deletions.
2 changes: 1 addition & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1358,7 +1358,7 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
go self.eventMux.Post(RemovedTransactionEvent{diff})
}
if len(deletedLogs) > 0 {
go self.eventMux.Post(RemovedLogEvent{deletedLogs})
go self.eventMux.Post(RemovedLogsEvent{deletedLogs})
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -982,7 +982,7 @@ func TestLogReorgs(t *testing.T) {
evmux := &event.TypeMux{}
blockchain, _ := NewBlockChain(db, FakePow{}, evmux)

subs := evmux.Subscribe(RemovedLogEvent{})
subs := evmux.Subscribe(RemovedLogsEvent{})
chain, _ := GenerateChain(genesis, db, 2, func(i int, gen *BlockGen) {
if i == 1 {
tx, err := types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), big.NewInt(1000000), new(big.Int), code).SignECDSA(key1)
Expand All @@ -1002,7 +1002,7 @@ func TestLogReorgs(t *testing.T) {
}

ev := <-subs.Chan()
if len(ev.Data.(RemovedLogEvent).Logs) == 0 {
if len(ev.Data.(RemovedLogsEvent).Logs) == 0 {
t.Error("expected logs")
}
}
7 changes: 6 additions & 1 deletion core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ type TxPreEvent struct{ Tx *types.Transaction }
// TxPostEvent is posted when a transaction has been processed.
type TxPostEvent struct{ Tx *types.Transaction }

// PendingLogsEvent is posted pre mining and notifies of pending logs.
type PendingLogsEvent struct {
Logs vm.Logs
}

// NewBlockEvent is posted when a block has been imported.
type NewBlockEvent struct{ Block *types.Block }

Expand All @@ -40,7 +45,7 @@ type NewMinedBlockEvent struct{ Block *types.Block }
type RemovedTransactionEvent struct{ Txs types.Transactions }

// RemovedLogEvent is posted when a reorg happens
type RemovedLogEvent struct{ Logs vm.Logs }
type RemovedLogsEvent struct{ Logs vm.Logs }

// ChainSplit is posted when a new head is detected
type ChainSplitEvent struct {
Expand Down
29 changes: 22 additions & 7 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ func (s *PublicFilterAPI) NewBlockFilter() (string, error) {

s.blockMu.Lock()
filter := New(s.chainDb)
id := s.filterManager.Add(filter)
id, err := s.filterManager.Add(filter, ChainFilter)
if err != nil {
return "", err
}

s.blockQueue[id] = &hashQueue{timeout: time.Now()}

filter.BlockCallback = func(block *types.Block, logs vm.Logs) {
Expand Down Expand Up @@ -174,7 +178,11 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
defer s.transactionMu.Unlock()

filter := New(s.chainDb)
id := s.filterManager.Add(filter)
id, err := s.filterManager.Add(filter, PendingTxFilter)
if err != nil {
return "", err
}

s.transactionQueue[id] = &hashQueue{timeout: time.Now()}

filter.TransactionCallback = func(tx *types.Transaction) {
Expand All @@ -194,12 +202,16 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
}

// newLogFilter creates a new log filter.
func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash) int {
func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash) (int, error) {
s.logMu.Lock()
defer s.logMu.Unlock()

filter := New(s.chainDb)
id := s.filterManager.Add(filter)
id, err := s.filterManager.Add(filter, LogFilter)
if err != nil {
return 0, err
}

s.logQueue[id] = &logQueue{timeout: time.Now()}

filter.SetBeginBlock(earliest)
Expand All @@ -215,7 +227,7 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo
}
}

return id
return id, nil
}

// NewFilterArgs represents a request to create a new filter.
Expand Down Expand Up @@ -352,9 +364,12 @@ func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) {

var id int
if len(args.Addresses) > 0 {
id = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics)
id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics)
} else {
id = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics)
id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics)
}
if err != nil {
return "", err
}

s.filterMapMu.Lock()
Expand Down
3 changes: 3 additions & 0 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package filters

import (
"math"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
Expand All @@ -32,6 +33,8 @@ type AccountChange struct {

// Filtering interface
type Filter struct {
created time.Time

db ethdb.Database
begin, end int64
addresses []common.Address
Expand Down
101 changes: 74 additions & 27 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package filters

import (
"fmt"
"sync"
"time"

Expand All @@ -27,26 +28,47 @@ import (
"github.com/ethereum/go-ethereum/event"
)

// FilterType determines the type of filter and is used to put the filter in to
// the correct bucket when added.
type FilterType byte

const (
ChainFilter FilterType = iota // new block events filter
PendingTxFilter // pending transaction filter
LogFilter // new or removed log filter
PendingLogFilter // pending log filter
)

// FilterSystem manages filters that filter specific events such as
// block, transaction and log events. The Filtering system can be used to listen
// for specific LOG events fired by the EVM (Ethereum Virtual Machine).
type FilterSystem struct {
filterMu sync.RWMutex
filterId int
filters map[int]*Filter
created map[int]time.Time
sub event.Subscription

chainFilters map[int]*Filter
pendingTxFilters map[int]*Filter
logFilters map[int]*Filter
pendingLogFilters map[int]*Filter

// generic is an ugly hack for Get
generic map[int]*Filter

sub event.Subscription
}

// NewFilterSystem returns a newly allocated filter manager
func NewFilterSystem(mux *event.TypeMux) *FilterSystem {
fs := &FilterSystem{
filters: make(map[int]*Filter),
created: make(map[int]time.Time),
chainFilters: make(map[int]*Filter),
pendingTxFilters: make(map[int]*Filter),
logFilters: make(map[int]*Filter),
pendingLogFilters: make(map[int]*Filter),
generic: make(map[int]*Filter),
}
fs.sub = mux.Subscribe(
//core.PendingBlockEvent{},
core.RemovedLogEvent{},
core.PendingLogsEvent{},
core.RemovedLogsEvent{},
core.ChainEvent{},
core.TxPreEvent{},
vm.Logs(nil),
Expand All @@ -61,32 +83,49 @@ func (fs *FilterSystem) Stop() {
}

// Add adds a filter to the filter manager
func (fs *FilterSystem) Add(filter *Filter) (id int) {
func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) {
fs.filterMu.Lock()
defer fs.filterMu.Unlock()
id = fs.filterId
fs.filters[id] = filter
fs.created[id] = time.Now()

id := fs.filterId
filter.created = time.Now()

switch filterType {
case ChainFilter:
fs.chainFilters[id] = filter
case PendingTxFilter:
fs.pendingTxFilters[id] = filter
case LogFilter:
fs.logFilters[id] = filter
case PendingLogFilter:
fs.pendingLogFilters[id] = filter
default:
return 0, fmt.Errorf("unknown filter type %v", filterType)
}
fs.generic[id] = filter

fs.filterId++

return id
return id, nil
}

// Remove removes a filter by filter id
func (fs *FilterSystem) Remove(id int) {
fs.filterMu.Lock()
defer fs.filterMu.Unlock()

delete(fs.filters, id)
delete(fs.created, id)
delete(fs.chainFilters, id)
delete(fs.pendingTxFilters, id)
delete(fs.logFilters, id)
delete(fs.pendingLogFilters, id)
delete(fs.generic, id)
}

// Get retrieves a filter installed using Add The filter may not be modified.
func (fs *FilterSystem) Get(id int) *Filter {
fs.filterMu.RLock()
defer fs.filterMu.RUnlock()

return fs.filters[id]
return fs.generic[id]
}

// filterLoop waits for specific events from ethereum and fires their handlers
Expand All @@ -96,43 +135,51 @@ func (fs *FilterSystem) filterLoop() {
switch ev := event.Data.(type) {
case core.ChainEvent:
fs.filterMu.RLock()
for id, filter := range fs.filters {
if filter.BlockCallback != nil && !fs.created[id].After(event.Time) {
for _, filter := range fs.chainFilters {
if filter.BlockCallback != nil && !filter.created.After(event.Time) {
filter.BlockCallback(ev.Block, ev.Logs)
}
}
fs.filterMu.RUnlock()

case core.TxPreEvent:
fs.filterMu.RLock()
for id, filter := range fs.filters {
if filter.TransactionCallback != nil && !fs.created[id].After(event.Time) {
for _, filter := range fs.pendingTxFilters {
if filter.TransactionCallback != nil && !filter.created.After(event.Time) {
filter.TransactionCallback(ev.Tx)
}
}
fs.filterMu.RUnlock()

case vm.Logs:
fs.filterMu.RLock()
for id, filter := range fs.filters {
if filter.LogCallback != nil && !fs.created[id].After(event.Time) {
for _, filter := range fs.logFilters {
if filter.LogCallback != nil && !filter.created.After(event.Time) {
for _, log := range filter.FilterLogs(ev) {
filter.LogCallback(log, false)
}
}
}
fs.filterMu.RUnlock()

case core.RemovedLogEvent:
case core.RemovedLogsEvent:
fs.filterMu.RLock()
for id, filter := range fs.filters {
if filter.LogCallback != nil && !fs.created[id].After(event.Time) {
for _, filter := range fs.logFilters {
if filter.LogCallback != nil && !filter.created.After(event.Time) {
for _, removedLog := range ev.Logs {
filter.LogCallback(removedLog, true)
}
}
}
fs.filterMu.RUnlock()
case core.PendingLogsEvent:
fs.filterMu.RLock()
for _, filter := range fs.pendingLogFilters {
if filter.LogCallback != nil && !filter.created.After(event.Time) {
for _, pendingLog := range ev.Logs {
filter.LogCallback(pendingLog, false)
}
}
}
fs.filterMu.RUnlock()
}
}
}
26 changes: 20 additions & 6 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func TestCallbacks(t *testing.T) {
txDone = make(chan struct{})
logDone = make(chan struct{})
removedLogDone = make(chan struct{})
pendingLogDone = make(chan struct{})
)

blockFilter := &Filter{
Expand All @@ -37,24 +38,30 @@ func TestCallbacks(t *testing.T) {
}
},
}

removedLogFilter := &Filter{
LogCallback: func(l *vm.Log, oob bool) {
if oob {
close(removedLogDone)
}
},
}
pendingLogFilter := &Filter{
LogCallback: func(*vm.Log, bool) {
close(pendingLogDone)
},
}

fs.Add(blockFilter)
fs.Add(txFilter)
fs.Add(logFilter)
fs.Add(removedLogFilter)
fs.Add(blockFilter, ChainFilter)
fs.Add(txFilter, PendingTxFilter)
fs.Add(logFilter, LogFilter)
fs.Add(removedLogFilter, LogFilter)
fs.Add(pendingLogFilter, PendingLogFilter)

mux.Post(core.ChainEvent{})
mux.Post(core.TxPreEvent{})
mux.Post(core.RemovedLogEvent{vm.Logs{&vm.Log{}}})
mux.Post(vm.Logs{&vm.Log{}})
mux.Post(core.RemovedLogsEvent{vm.Logs{&vm.Log{}}})
mux.Post(core.PendingLogsEvent{vm.Logs{&vm.Log{}}})

const dura = 5 * time.Second
failTimer := time.NewTimer(dura)
Expand Down Expand Up @@ -84,4 +91,11 @@ func TestCallbacks(t *testing.T) {
case <-failTimer.C:
t.Error("removed log filter failed to trigger (timeout)")
}

failTimer.Reset(dura)
select {
case <-pendingLogDone:
case <-failTimer.C:
t.Error("pending log filter failed to trigger (timout)")
}
}
Loading

0 comments on commit 987c1a5

Please sign in to comment.