Skip to content

Commit

Permalink
Showing 4 changed files with 59 additions and 4 deletions.
1 change: 1 addition & 0 deletions cmd/start.go
Original file line number Diff line number Diff line change
@@ -822,6 +822,7 @@ func (x *Start) Execute(args []string) error {

core.Node.StartMessageRetriever()
core.Node.StartPointerRepublisher()
core.Node.StartDisputeNotifier()

if !x.DisableWallet {
// If the wallet doesn't allow resyncing from a specific height to scan for unpaid orders, wait for all messages to process before continuing.
4 changes: 4 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
@@ -99,6 +99,10 @@ type OpenBazaarNode struct {
// Last ditch API to find records that dropped out of the DHT
IPNSBackupAPI string

// DisputeNotifier is a worker that walks the cases datastore to
// notify the user as disputes age past certain thresholds
DisputeNotifier *disputeNotifier

TestnetEnable bool
RegressionTestEnable bool
}
57 changes: 53 additions & 4 deletions core/dispute_notifier.go
Original file line number Diff line number Diff line change
@@ -5,14 +5,61 @@ import (
"time"

"github.com/OpenBazaar/openbazaar-go/repo"
"github.com/op/go-logging"
)

type disputeNotifier struct {
disputeCasesDB repo.CaseStore
notificationsDB repo.NotificationStore

intervalDelay time.Duration
logger *logging.Logger
runCount int
watchdogTimer *time.Ticker
stopWorker chan bool
}

func (n *OpenBazaarNode) StartDisputeNotifier() {
n.DisputeNotifier = &disputeNotifier{
disputeCasesDB: n.Datastore.Cases(),
notificationsDB: n.Datastore.Notifications(),
intervalDelay: time.Duration(10) * time.Minute,
logger: logging.MustGetLogger("disputeNotifier"),
}
go n.DisputeNotifier.Run()
}

func (d *disputeNotifier) RunCount() int { return d.runCount }

func (d *disputeNotifier) Run() {
d.watchdogTimer = time.NewTicker(d.intervalDelay)
d.stopWorker = make(chan bool)

// Run once on start, then wait for watchdog
if err := d.PerformTask(); err != nil {
d.logger.Error("performTask failure:", err.Error())
}
for {
select {
case <-d.watchdogTimer.C:
if err := d.PerformTask(); err != nil {
d.logger.Error("performTask failure:", err.Error())
}
case <-d.stopWorker:
d.watchdogTimer.Stop()
return
}
}
}

func (d *disputeNotifier) Stop() {
d.stopWorker <- true
close(d.stopWorker)
}

func (d *disputeNotifier) PerformTask() error {
d.runCount += 1
d.logger.Infof("performTask started (count %d)", d.runCount)
disputes, err := d.disputeCasesDB.GetDisputesForNotification()
if err != nil {
return err
@@ -57,26 +104,28 @@ func (d *disputeNotifier) PerformTask() error {
for _, n := range notificationsToAdd {
var serializedNotification, err = n.MarshalNotificationToJSON()
if err != nil {
// TODO: Log error
d.logger.Warning("marshaling notification:", err.Error())
d.logger.Infof("failed marshal: %+v", n)
continue
}
var template = "insert into notifications(notifID, serializedNotification, type, timestamp, read) values(?,?,?,?,?)"
_, err = notificationTx.Exec(template, n.GetID(), serializedNotification, n.GetDowncaseType(), n.GetSQLTimestamp(), 0)
if err != nil {
// TODO: Log error
d.logger.Warning("inserting notification:", err.Error())
d.logger.Infof("failed insert: %+v", n)
continue
}
}

if err = notificationTx.Commit(); err != nil {
// TODO: Log error
if rollbackErr := notificationTx.Rollback(); rollbackErr != nil {
// TODO: Log error
err = fmt.Errorf(err.Error(), "\nand also failed during rollback:", rollbackErr.Error())
}
return fmt.Errorf("commiting notifications:", err.Error())
}
d.logger.Infof("created %d dispute notifications", len(notificationsToAdd))

err = d.disputeCasesDB.UpdateDisputesLastNotifiedAt(disputes)
d.logger.Infof("updated lastNotifiedAt on %d disputes", len(disputes))
return nil
}
1 change: 1 addition & 0 deletions openbazaard.go
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@ func main() {
log.Info("OpenBazaar Server shutting down...")
if core.Node != nil {
if core.Node.MessageRetriever != nil {
core.Node.DisputeNotifier.Stop()
close(core.Node.MessageRetriever.DoneChan)
core.Node.MessageRetriever.Wait()
}

0 comments on commit 1ba8615

Please sign in to comment.