Skip to content

Commit

Permalink
eth/filter: add support for pending logs (ethereum#3219)
Browse files Browse the repository at this point in the history
  • Loading branch information
bas-vk authored and obscuren committed Nov 28, 2016
1 parent 318ad3c commit b5be6b7
Show file tree
Hide file tree
Showing 7 changed files with 319 additions and 92 deletions.
80 changes: 47 additions & 33 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,17 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}

rpcSub := notifier.CreateSubscription()
var (
rpcSub = notifier.CreateSubscription()
matchedLogs = make(chan []Log)
)

logsSub, err := api.events.SubscribeLogs(crit, matchedLogs)
if err != nil {
return nil, err
}

go func() {
matchedLogs := make(chan []Log)
logsSub := api.events.SubscribeLogs(crit, matchedLogs)

for {
select {
Expand Down Expand Up @@ -276,18 +282,20 @@ type FilterCriteria struct {
// used to retrieve logs when the state changes. This method cannot be
// used to fetch logs that are already stored in the state.
//
// Default criteria for the from and to block are "latest".
// Using "latest" as block number will return logs for mined blocks.
// Using "pending" as block number returns logs for not yet mined (pending) blocks.
// In case logs are removed (chain reorg) previously returned logs are returned
// again but with the removed property set to true.
//
// In case "fromBlock" > "toBlock" an error is returned.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter
func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) rpc.ID {
var (
logs = make(chan []Log)
logsSub = api.events.SubscribeLogs(crit, logs)
)

if crit.FromBlock == nil {
crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
if crit.ToBlock == nil {
crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
logs := make(chan []Log)
logsSub, err := api.events.SubscribeLogs(crit, logs)
if err != nil {
return rpc.ID(""), err
}

api.filtersMu.Lock()
Expand All @@ -312,7 +320,7 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) rpc.ID {
}
}()

return logsSub.ID
return logsSub.ID, nil
}

// GetLogs returns logs matching the given argument that are stored within the state.
Expand Down Expand Up @@ -363,28 +371,38 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]Log
api.filtersMu.Unlock()

if !found || f.typ != LogsSubscription {
return []Log{}, nil
return nil, fmt.Errorf("filter not found")
}

filter := New(api.backend, api.useMipMap)
filter.SetBeginBlock(f.crit.FromBlock.Int64())
filter.SetEndBlock(f.crit.ToBlock.Int64())
if f.crit.FromBlock != nil {
filter.SetBeginBlock(f.crit.FromBlock.Int64())
} else {
filter.SetBeginBlock(rpc.LatestBlockNumber.Int64())
}
if f.crit.ToBlock != nil {
filter.SetEndBlock(f.crit.ToBlock.Int64())
} else {
filter.SetEndBlock(rpc.LatestBlockNumber.Int64())
}
filter.SetAddresses(f.crit.Addresses)
filter.SetTopics(f.crit.Topics)

logs, err := filter.Find(ctx)
return returnLogs(logs), err
logs, err:= filter.Find(ctx)
if err != nil {
return nil, err
}
return returnLogs(logs), nil
}

// GetFilterChanges returns the logs for the filter with the given id since
// last time is was called. This can be used for polling.
//
// For pending transaction and block filters the result is []common.Hash.
// (pending)Log filters return []Log. If the filter could not be found
// []interface{}{} is returned.
// (pending)Log filters return []Log.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges
func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) interface{} {
func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
api.filtersMu.Lock()
defer api.filtersMu.Unlock()

Expand All @@ -400,15 +418,15 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) interface{} {
case PendingTransactionsSubscription, BlocksSubscription:
hashes := f.hashes
f.hashes = nil
return returnHashes(hashes)
case PendingLogsSubscription, LogsSubscription:
return returnHashes(hashes), nil
case LogsSubscription:
logs := f.logs
f.logs = nil
return returnLogs(logs)
return returnLogs(logs), nil
}
}

return []interface{}{}
return []interface{}{}, fmt.Errorf("filter not found")
}

// returnHashes is a helper that will return an empty hash array case the given hash array is nil,
Expand Down Expand Up @@ -443,15 +461,11 @@ func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
return err
}

if raw.From == nil || raw.From.Int64() < 0 {
args.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
} else {
if raw.From != nil {
args.FromBlock = big.NewInt(raw.From.Int64())
}

if raw.ToBlock == nil || raw.ToBlock.Int64() < 0 {
args.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
} else {
if raw.ToBlock != nil {
args.ToBlock = big.NewInt(raw.ToBlock.Int64())
}

Expand Down
8 changes: 4 additions & 4 deletions eth/filters/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
if err := json.Unmarshal([]byte("{}"), &test0); err != nil {
t.Fatal(err)
}
if test0.FromBlock.Int64() != rpc.LatestBlockNumber.Int64() {
t.Fatalf("expected %d, got %d", rpc.LatestBlockNumber, test0.FromBlock)
if test0.FromBlock != nil {
t.Fatalf("expected nil, got %d", test0.FromBlock)
}
if test0.ToBlock.Int64() != rpc.LatestBlockNumber.Int64() {
t.Fatalf("expected %d, got %d", rpc.LatestBlockNumber, test0.ToBlock)
if test0.ToBlock != nil {
t.Fatalf("expected nil, got %d", test0.ToBlock)
}
if len(test0.Addresses) != 0 {
t.Fatalf("expected 0 addresses, got %d", len(test0.Addresses))
Expand Down
15 changes: 11 additions & 4 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"math"
"time"

"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -162,7 +164,7 @@ func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []Log, er
}
unfiltered = append(unfiltered, rl...)
}
logs = append(logs, filterLogs(unfiltered, f.addresses, f.topics)...)
logs = append(logs, filterLogs(unfiltered, nil, nil, f.addresses, f.topics)...)
}
}

Expand All @@ -179,12 +181,18 @@ func includes(addresses []common.Address, a common.Address) bool {
return false
}

func filterLogs(logs []Log, addresses []common.Address, topics [][]common.Hash) []Log {
func filterLogs(logs []Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []Log {
var ret []Log

// Filter the logs for interesting stuff
Logs:
for _, log := range logs {
if fromBlock != nil && fromBlock.Int64() >= 0 && uint64(fromBlock.Int64()) > log.BlockNumber {
continue
}
if toBlock != nil && toBlock.Int64() >= 0 && uint64(toBlock.Int64()) < log.BlockNumber {
continue
}

if len(addresses) > 0 && !includes(addresses, log.Address) {
continue
}
Expand All @@ -211,7 +219,6 @@ Logs:
continue Logs
}
}

ret = append(ret, log)
}

Expand Down
Loading

0 comments on commit b5be6b7

Please sign in to comment.