Skip to content

Commit

Permalink
make browser.page public and optimize event observable type
Browse files Browse the repository at this point in the history
resolve go-rod#29
  • Loading branch information
ysmood committed May 14, 2020
1 parent 04fd7f0 commit 9b680d3
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 23 deletions.
20 changes: 10 additions & 10 deletions browser.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Browser struct {
monitorServer *kit.ServerContext

client *cdp.Client
event *kit.Observable // all the browser events from cdp client
event *Observable // all the browser events from cdp client
}

// New creates a controller
Expand Down Expand Up @@ -162,7 +162,7 @@ func (b *Browser) PageE(url string) (*Page, error) {
return nil, err
}

return b.page(target.TargetID)
return b.PageFromTargetIDE(target.TargetID)
}

// PagesE doc is the same as the method Pages
Expand All @@ -178,7 +178,7 @@ func (b *Browser) PagesE() (Pages, error) {
continue
}

page, err := b.page(target.TargetID)
page, err := b.PageFromTargetIDE(target.TargetID)
if err != nil {
return nil, err
}
Expand All @@ -191,13 +191,12 @@ func (b *Browser) PagesE() (Pages, error) {
// EventFilter to filter events
type EventFilter func(*cdp.Event) bool

// WaitEventE returns wait and cancel methods
// WaitEventE returns a channel that resolves the next event and close
func (b *Browser) WaitEventE(filter EventFilter) <-chan error {
wait := make(chan error)
go func() {
_, err := b.Event().Until(b.ctx, func(e kit.Event) bool {
event := e.(*cdp.Event)
return filter(event)
_, err := b.event.Until(b.ctx, func(e *cdp.Event) bool {
return filter(e)
})
wait <- err
close(wait)
Expand All @@ -207,7 +206,7 @@ func (b *Browser) WaitEventE(filter EventFilter) <-chan error {
}

// Event returns the observable for browser events
func (b *Browser) Event() *kit.Observable {
func (b *Browser) Event() *Observable {
return b.event
}

Expand Down Expand Up @@ -263,7 +262,8 @@ func (b *Browser) CallContext() (context.Context, proto.Client, string) {
return b.ctx, b.client, ""
}

func (b *Browser) page(targetID proto.TargetTargetID) (*Page, error) {
// PageFromTargetIDE creates a Page instance from a targetID
func (b *Browser) PageFromTargetIDE(targetID proto.TargetTargetID) (*Page, error) {
page := &Page{
ctx: b.ctx,
browser: b,
Expand All @@ -279,7 +279,7 @@ func (b *Browser) page(targetID proto.TargetTargetID) (*Page, error) {
}

func (b *Browser) initEvents() error {
b.event = kit.NewObservable()
b.event = NewObservable()

go func() {
for msg := range b.client.Event() {
Expand Down
3 changes: 1 addition & 2 deletions dev_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ func (b *Browser) ServeMonitor(host string) *kit.ServerContext {
})
srv.Engine.GET("/screenshot/:id", func(ctx kit.GinContext) {
id := proto.TargetTargetID(ctx.Param("id"))
p, err := b.page(id)
kit.E(err)
p := b.PageFromTargetID(id)

ctx.Header("Content-Type", "image/png;")
kit.E(ctx.Writer.Write(p.Screenshot()))
Expand Down
24 changes: 24 additions & 0 deletions examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,27 @@ func Example_direct_cdp() {
// true
// rod=test;
}

// Shows how to subscribe events on the browser.
func Example_handle_events() {
browser := rod.New().Connect()
defer browser.Close()

go func() {
for e := range browser.Event().Subscribe().C {
created := &proto.TargetTargetCreated{}
if rod.Event(e, created) {
// create a page from the page id
page := browser.PageFromTargetID(created.TargetInfo.TargetID)

// set a global value on each page
page.Eval(`() => window.hey = "ok"`)
}
}
}()

// create a new page and get the value of "hey"
fmt.Println(browser.Page("https://github.com").WaitLoad().Eval(`() => hey`).String())

// Output: ok
}
112 changes: 112 additions & 0 deletions observable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package rod

import (
"context"
"sync"
"sync/atomic"

"github.com/ysmood/rod/lib/cdp"
)

// Observable is a thread-safe event helper
type Observable struct {
subscribers *sync.Map
idCount int64
}

// NewObservable creates a new observable
func NewObservable() *Observable {
return &Observable{
subscribers: &sync.Map{},
}
}

// Publish event to all subscribers, no internal goroutine is used,
// so the publish can block the goroutine. Use goroutine or buffer to prevent the blocking.
func (o *Observable) Publish(e *cdp.Event) {
o.subscribers.Range(func(_, s interface{}) (goOn bool) {
goOn = true
defer func() { _ = recover() }()
s.(*Subscriber).C <- e
return
})
}

// Subscribe returns a subscriber to emit events
func (o *Observable) Subscribe() *Subscriber {
id := atomic.AddInt64(&o.idCount, 1)

subscriber := &Subscriber{
C: make(chan *cdp.Event),
id: id,
}

o.subscribers.Store(id, subscriber)

return subscriber
}

// Unsubscribe from the observable
func (o *Observable) Unsubscribe(s *Subscriber) {
defer func() { _ = recover() }()

close(s.C)
o.subscribers.Delete(s.id)
}

// UnsubscribeAll current subscribers
func (o *Observable) UnsubscribeAll() {
defer func() { _ = recover() }()

o.subscribers.Range(func(_, s interface{}) bool {
o.Unsubscribe(s.(*Subscriber))
return true
})
}

// Count returns the number of subscribers
func (o *Observable) Count() int {
c := 0
o.subscribers.Range(func(key, value interface{}) bool {
c++
return true
})
return c
}

// Until check returns true keep waiting
func (o *Observable) Until(ctx context.Context, check func(*cdp.Event) bool) (*cdp.Event, error) {
s := o.Subscribe()
defer o.Unsubscribe(s)

for {
select {
case e := <-s.C:
if check(e) {
return e, nil
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
}

// Subscriber of the observable
type Subscriber struct {
C chan *cdp.Event
id int64
}

// Filter events
func (s *Subscriber) Filter(filter func(*cdp.Event) bool) chan *cdp.Event {
filtered := make(chan *cdp.Event)
go func() {
for e := range s.C {
if filter(e) {
filtered <- e
}
}
close(filtered)
}()
return filtered
}
20 changes: 9 additions & 11 deletions page.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (p *Page) WaitPageE() func() (*Page, error) {
if err != nil {
return nil, err
}
return p.browser.Context(p.ctx).page(targetInfo.TargetID)
return p.browser.Context(p.ctx).PageFromTargetIDE(targetInfo.TargetID)
}
}

Expand All @@ -331,7 +331,7 @@ func (p *Page) PauseE() error {
// Use the includes and excludes regexp list to filter the requests by their url.
// Such as set n to 1 if there's a polling request.
func (p *Page) WaitRequestIdleE(d time.Duration, includes, excludes []string) func() error {
s := p.browser.Event().Subscribe()
s := p.browser.event.Subscribe()
done := false

return func() (err error) {
Expand All @@ -343,7 +343,7 @@ func (p *Page) WaitRequestIdleE(d time.Duration, includes, excludes []string) fu
if p.browser.trace {
defer p.Overlay(0, 0, 300, 0, "waiting for request idle "+strings.Join(includes, " "))()
}
defer p.browser.Event().Unsubscribe(s)
defer p.browser.event.Unsubscribe(s)

reqList := map[proto.NetworkRequestID]kit.Nil{}
timeout := time.NewTimer(d)
Expand All @@ -361,33 +361,31 @@ func (p *Page) WaitRequestIdleE(d time.Duration, includes, excludes []string) fu
return p.ctx.Err()
case <-timeout.C:
return
case msg, ok := <-s.C:
case e, ok := <-s.C:
if !ok {
return
}

sent := &proto.NetworkRequestWillBeSent{}
finished := &proto.NetworkLoadingFinished{}
failed := &proto.NetworkLoadingFailed{}
received := &proto.NetworkDataReceived{}
received := &proto.NetworkResponseReceived{}

e := msg.(*cdp.Event)
switch e.Method {
case sent.MethodName():
if Event(e, sent) {
timeout.Stop()
kit.E(json.Unmarshal(e.Params, sent))
url := sent.Request.URL
id := sent.RequestID
if matchWithFilter(url, includes, excludes) {
reqList[id] = kit.Nil{}
}
case finished.MethodName():
} else if Event(e, finished) {
kit.E(json.Unmarshal(e.Params, finished))
reset(finished.RequestID)
case failed.MethodName():
} else if Event(e, failed) {
kit.E(json.Unmarshal(e.Params, failed))
reset(failed.RequestID)
case received.MethodName():
} else if Event(e, received) {
kit.E(json.Unmarshal(e.Params, received))
reset(received.RequestID)
}
Expand Down
7 changes: 7 additions & 0 deletions sugar.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ func (b *Browser) Pages() Pages {
return list
}

// PageFromTargetID creates a Page instance from a targetID
func (b *Browser) PageFromTargetID(targetID proto.TargetTargetID) *Page {
p, err := b.PageFromTargetIDE(targetID)
kit.E(err)
return p
}

// WaitEvent resolves the wait function when the filter returns true
func (b *Browser) WaitEvent(e proto.Event) (wait func()) {
w := b.WaitEventE(NewEventFilter(e))
Expand Down
9 changes: 9 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ func CancelPanic(err error) {
}
}

// Event helps to convert a cdp.Event to proto.Event. Returns false if the convertion fails
func Event(msg *cdp.Event, evt proto.Event) bool {
if msg.Method == evt.MethodName() {
err := json.Unmarshal(msg.Params, evt)
return err == nil
}
return false
}

// NewEventFilter creates a event filter, when matches it will load data into the event object
func NewEventFilter(event proto.Event) EventFilter {
return func(e *cdp.Event) bool {
Expand Down

0 comments on commit 9b680d3

Please sign in to comment.