Skip to content

Commit

Permalink
logging
Browse files Browse the repository at this point in the history
  • Loading branch information
magiconair committed Oct 3, 2020
1 parent f5b9a52 commit 6934784
Showing 1 changed file with 36 additions and 21 deletions.
57 changes: 36 additions & 21 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package opcua

import (
"context"
"encoding/json"
"fmt"
"log"
"sync"
Expand Down Expand Up @@ -186,14 +187,24 @@ func (s *Subscription) SetTriggering(triggeringItemID uint32, add, remove []uint

// republish executes a synchronous republish request.
func (s *Subscription) republish(req *ua.RepublishRequest) (*ua.RepublishResponse, error) {
log.Println("RepublishRequest:", toJSON(req))
var res *ua.RepublishResponse
err := s.c.sechan.SendRequest(req, s.c.Session().resp.AuthenticationToken, func(v interface{}) error {
return safeAssign(v, &res)
})
log.Println("RepublishResponse:", toJSON(res))
return res, err
}

func (s *Subscription) publish() (*ua.PublishResponse, error) {
func toJSON(v interface{}) string {
b, err := json.Marshal(v)
if err != nil {
return err.Error()
}
return string(b)
}

func (s *Subscription) sendPublishRequest() (*ua.PublishResponse, error) {
s.pendingAcksMux.RLock()
req := &ua.PublishRequest{
SubscriptionAcknowledgements: make([]*ua.SubscriptionAcknowledgement, len(s.pendingAcks)),
Expand All @@ -206,10 +217,12 @@ func (s *Subscription) publish() (*ua.PublishResponse, error) {
}
s.pendingAcksMux.RUnlock()

log.Println("PublishRequest:", toJSON(req))
var res *ua.PublishResponse
err := s.c.sendWithTimeout(req, s.publishTimeout(), func(v interface{}) error {
return safeAssign(v, &res)
})
log.Println("PublishResponse:", toJSON(res))
return res, err
}

Expand Down Expand Up @@ -262,7 +275,6 @@ func (s *Subscription) Run(ctx context.Context) {

publish:
for {
plog.Print("select")
select {
case <-ctx.Done():
plog.Println("ctx.Done()")
Expand Down Expand Up @@ -303,7 +315,6 @@ publish:
continue

default:
plog.Print("publish")
s.runPublish(cctx)
}
}
Expand All @@ -315,7 +326,6 @@ func (s *Subscription) runPublish(ctx context.Context) {
defer plog.Print("done")

for {
plog.Print("select")
select {
case <-ctx.Done():
plog.Print("ctx.Done()")
Expand All @@ -326,40 +336,42 @@ func (s *Subscription) runPublish(ctx context.Context) {
return

default:
plog.Printf("default")
s.pendingAcksMux.RLock()
plog.Printf("req: lastSeq=%d pendingAcks=%v", s.lastSequenceNumber, s.pendingAcks)
s.pendingAcksMux.RUnlock()

// send the next publish request
// note that res contains data even if an error was returned
res, err := s.publish()
res, err := s.sendPublishRequest()
switch {
case err == nil && res.SubscriptionID != s.SubscriptionID:
plog.Printf("Got notifs for other subscription %d. Skipping", res.SubscriptionID)
plog.Printf("error: got notifs for other subscription %d. Skipping", res.SubscriptionID)
continue

case err == ua.StatusBadSequenceNumberUnknown:
// todo(fs): this should only happen per in the status codes
// todo(fs): lets log this here to see
plog.Printf("Got error %s which should only happen in the ACK results", err)
plog.Printf("error: this should only happen when ACK'ing results: %s", err)

case err == ua.StatusBadTooManyPublishRequests:
// todo(fs): we have sent too many publish requests
// todo(fs): we need to slow down
plog.Printf("got %s. Sleeping for one second", err)
plog.Printf("error: sleeping for one second: %s", err)
time.Sleep(time.Second) // does this make sense

case err == ua.StatusBadTimeout:
// ignore and continue the loop
plog.Print("Timeout. ignoring")
plog.Printf("error: ignoring: %s", err)

case err == ua.StatusBadNoSubscription:
// All subscriptions have been deleted, but the publishing loop is still running
// The user will stop the loop or create subscriptions at his discretion
plog.Print("Subscription invalid. Waiting for publishing loop to stop")
plog.Printf("error: subscription id invalid. Waiting for publishing loop to stop")

case err != nil:
// irrecoverable error
s.c.notifySubscriptionsOfError(ctx, s.SubscriptionID, err)
plog.Printf("Notify error %s. Stopping publish loop", err)
plog.Printf("error: unrecoverable error. stopping publish loop: %s", err)
return
}

Expand All @@ -372,7 +384,7 @@ func (s *Subscription) runPublish(ctx context.Context) {
// we assume that the number of results in the response match
// the number of pending acks from the previous PublishRequest.
if len(pendingAcks) != len(res.Results) {
plog.Printf("Got %d results for pending ACKs, want %d", len(res.Results), len(pendingAcks))
plog.Printf("error: got %d results for pending ACKs but want %d", len(res.Results), len(pendingAcks))
// todo(fs): what should we do here?
pendingAcks = nil
}
Expand All @@ -386,14 +398,14 @@ func (s *Subscription) runPublish(ctx context.Context) {
// publish response ack'ed -> skip
case ua.StatusBadSubscriptionIDInvalid:
// old subscription id -> skip
plog.Printf("Old subscription id: %s", err)
plog.Printf("error: subscription id invalid. skipping: %s", err)
case ua.StatusBadSequenceNumberUnknown:
// server does not have the message in its retransmission queue anymore
plog.Printf("Server does not have notif %d anymore: %s", seqnr, err)
plog.Printf("error: notif %d not on server anymore: %s", seqnr, err)
default:
// otherwise, we try to ack again
notAcked = append(notAcked, seqnr)
plog.Printf("Retrying to ACK notif %d: %s", seqnr, err)
plog.Printf("retrying to ACK notif %d: %s", seqnr, err)
}
}
pendingAcks = notAcked
Expand All @@ -410,14 +422,17 @@ func (s *Subscription) runPublish(ctx context.Context) {
availSeqs = res.AvailableSequenceNumbers
)

plog.Printf("resp: lastSeq=%d nextSeq=%d thisSeq=%d availSeqs=%v notAcked=%v pendingAcks=%v",
lastSeq, nextSeq, thisSeq, availSeqs, notAcked, pendingAcks)

if thisSeq > nextSeq {
for seqnr := nextSeq; seqnr < thisSeq; seqnr++ {
if !uint32SliceContains(seqnr, availSeqs) {
plog.Printf("Missed notif %d but server no longer has it. Data loss", seqnr)
plog.Printf("error: missed notif %d but server no longer has it. Data loss", seqnr)
continue
}

plog.Printf("Requesting republish of missed notif %d", seqnr)
plog.Printf("requesting republish of missed notif %d", seqnr)
rpres, rperr := s.republish(&ua.RepublishRequest{
SubscriptionID: res.SubscriptionID,
RetransmitSequenceNumber: seqnr,
Expand All @@ -427,17 +442,17 @@ func (s *Subscription) runPublish(ctx context.Context) {
lastSeq = seqnr
pendingAcks = append(pendingAcks, seqnr)
s.c.notifySubscription(ctx, res.SubscriptionID, rpres.NotificationMessage)
plog.Printf("Received missed notif %d", seqnr)
plog.Printf("received missed notif %d", seqnr)
default:
lastSeq = seqnr
plog.Printf("Republish request for missed notif %d failed. Data loss: %s", seqnr, err)
plog.Printf("error: republish request for missed notif %d failed. Data loss: %s", seqnr, err)
}
}
}

if err == nil {
s.c.notifySubscription(ctx, res.SubscriptionID, res.NotificationMessage)
plog.Printf("notif %d", res.NotificationMessage.SequenceNumber)
plog.Printf("notif: %d", res.NotificationMessage.SequenceNumber)
lastSeq = res.NotificationMessage.SequenceNumber
pendingAcks = append(pendingAcks, res.NotificationMessage.SequenceNumber)
}
Expand Down

0 comments on commit 6934784

Please sign in to comment.