Skip to content

Commit

Permalink
Analytics (#450)
Browse files Browse the repository at this point in the history
Mixpanel service, add more events, fix up events processing
  • Loading branch information
Dominic Wong authored Jun 2, 2021
1 parent f77525b commit 154be25
Show file tree
Hide file tree
Showing 22 changed files with 1,044 additions and 312 deletions.
66 changes: 53 additions & 13 deletions balance/handler/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/go-redis/redis/v8"
"github.com/google/uuid"
balance "github.com/m3o/services/balance/proto"
pb "github.com/m3o/services/balance/proto"
ns "github.com/m3o/services/namespaces/proto"
m3oauth "github.com/m3o/services/pkg/auth"
publicapi "github.com/m3o/services/publicapi/proto"
Expand All @@ -21,6 +21,7 @@ import (
"github.com/micro/micro/v3/service/client"
"github.com/micro/micro/v3/service/config"
"github.com/micro/micro/v3/service/errors"
"github.com/micro/micro/v3/service/events"
"github.com/micro/micro/v3/service/logger"
log "github.com/micro/micro/v3/service/logger"
"github.com/micro/micro/v3/service/store"
Expand Down Expand Up @@ -146,7 +147,7 @@ func NewHandler(svc *service.Service) *Balance {
return b
}

func (b Balance) Increment(ctx context.Context, request *balance.IncrementRequest, response *balance.IncrementResponse) error {
func (b Balance) Increment(ctx context.Context, request *pb.IncrementRequest, response *pb.IncrementResponse) error {
// increment counter
acc, err := m3oauth.VerifyMicroAdmin(ctx, "balance.Increment")
if err != nil {
Expand All @@ -163,10 +164,26 @@ func (b Balance) Increment(ctx context.Context, request *balance.IncrementReques
return err
}
response.NewBalance = currBal
if err := storeAdjustment(acc.ID, request.Delta, request.CustomerId, request.Reference, request.Visible, nil); err != nil {
adj, err := storeAdjustment(acc.ID, request.Delta, request.CustomerId, request.Reference, request.Visible, nil)
if err != nil {
return err
}

evt := pb.Event{
Type: pb.EventType_EventTypeIncrement,
Adjustment: &pb.Adjustment{
Id: adj.ID,
Created: adj.Created.Unix(),
Delta: adj.Amount,
Reference: adj.Reference,
Meta: adj.Meta,
},
CustomerId: adj.CustomerID,
}
if err := events.Publish(pb.EventsTopic, &evt); err != nil {
logger.Errorf("Error publishing event %+v", evt)
}

if currBal < 0 {
return nil
}
Expand All @@ -182,7 +199,7 @@ func (b Balance) Increment(ctx context.Context, request *balance.IncrementReques
return nil
}

func storeAdjustment(actionedBy string, delta int64, customerID, reference string, visible bool, meta map[string]string) error {
func storeAdjustment(actionedBy string, delta int64, customerID, reference string, visible bool, meta map[string]string) (*Adjustment, error) {

// record it
rec := &Adjustment{
Expand All @@ -197,19 +214,19 @@ func storeAdjustment(actionedBy string, delta int64, customerID, reference strin
}
adj, err := json.Marshal(rec)
if err != nil {
return err
return nil, err
}

if err := store.Write(&store.Record{
Key: fmt.Sprintf("%s/%s/%s", prefixStoreByCustomer, customerID, rec.ID),
Value: adj,
}); err != nil {
return err
return nil, err
}
return nil
return rec, nil
}

func (b Balance) Decrement(ctx context.Context, request *balance.DecrementRequest, response *balance.DecrementResponse) error {
func (b Balance) Decrement(ctx context.Context, request *pb.DecrementRequest, response *pb.DecrementResponse) error {
acc, err := m3oauth.VerifyMicroAdmin(ctx, "balance.Decrement")
if err != nil {
return err
Expand All @@ -225,13 +242,36 @@ func (b Balance) Decrement(ctx context.Context, request *balance.DecrementReques
}

response.NewBalance = currBal
if err := storeAdjustment(acc.ID, -request.Delta, request.CustomerId, request.Reference, request.Visible, nil); err != nil {
adj, err := storeAdjustment(acc.ID, -request.Delta, request.CustomerId, request.Reference, request.Visible, nil)
if err != nil {
return err
}

evt := pb.Event{
Type: pb.EventType_EventTypeDecrement,
Adjustment: &pb.Adjustment{
Id: adj.ID,
Created: adj.Created.Unix(),
Delta: adj.Amount,
Reference: adj.Reference,
Meta: adj.Meta,
},
CustomerId: adj.CustomerID,
}
if err := events.Publish(pb.EventsTopic, &evt); err != nil {
logger.Errorf("Error publishing event %+v", evt)
}

if currBal > 0 {
return nil
}
evt = pb.Event{
Type: pb.EventType_EventTypeZeroBalance,
CustomerId: adj.CustomerID,
}
if err := events.Publish(pb.EventsTopic, &evt); err != nil {
logger.Errorf("Error publishing event %+v", evt)
}
if _, err := b.v1Svc.BlockKey(context.Background(), &v1api.BlockKeyRequest{
UserId: request.CustomerId,
Namespace: microNamespace,
Expand All @@ -244,7 +284,7 @@ func (b Balance) Decrement(ctx context.Context, request *balance.DecrementReques
return nil
}

func (b Balance) Current(ctx context.Context, request *balance.CurrentRequest, response *balance.CurrentResponse) error {
func (b Balance) Current(ctx context.Context, request *pb.CurrentRequest, response *pb.CurrentResponse) error {
acc, err := m3oauth.VerifyMicroCustomer(ctx, "balance.Current")
if err != nil {
return err
Expand All @@ -267,7 +307,7 @@ func (b Balance) Current(ctx context.Context, request *balance.CurrentRequest, r
return nil
}

func (b Balance) ListAdjustments(ctx context.Context, request *balance.ListAdjustmentsRequest, response *balance.ListAdjustmentsResponse) error {
func (b Balance) ListAdjustments(ctx context.Context, request *pb.ListAdjustmentsRequest, response *pb.ListAdjustmentsResponse) error {
acc, err := m3oauth.VerifyMicroCustomer(ctx, "balance.ListAdjustments")
if err != nil {
// TODO check for micro admin
Expand All @@ -278,7 +318,7 @@ func (b Balance) ListAdjustments(ctx context.Context, request *balance.ListAdjus
return err
}

ret := []*balance.Adjustment{}
ret := []*pb.Adjustment{}
for _, rec := range recs {
var adj Adjustment
if err := json.Unmarshal(rec.Value, &adj); err != nil {
Expand All @@ -287,7 +327,7 @@ func (b Balance) ListAdjustments(ctx context.Context, request *balance.ListAdjus
if !adj.Visible {
continue
}
ret = append(ret, &balance.Adjustment{
ret = append(ret, &pb.Adjustment{
Id: adj.ID,
Created: adj.Created.Unix(),
Delta: adj.Amount,
Expand Down
177 changes: 64 additions & 113 deletions balance/handler/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package handler
import (
"context"
"encoding/json"
"fmt"
"time"

ns "github.com/m3o/services/namespaces/proto"
pb "github.com/m3o/services/balance/proto"
pevents "github.com/m3o/services/pkg/events"
stripepb "github.com/m3o/services/stripe/proto"
v1api "github.com/m3o/services/v1api/proto"
"github.com/micro/micro/v3/service/client"
"github.com/micro/micro/v3/service/errors"
"github.com/micro/micro/v3/service/events"
mevents "github.com/micro/micro/v3/service/events"
"github.com/micro/micro/v3/service/logger"
)
Expand All @@ -20,76 +20,33 @@ const (
)

func (b *Balance) consumeEvents() {
processTopic := func(topic string, handler func(ch <-chan mevents.Event)) {
var evs <-chan mevents.Event
start := time.Now()
for {
var err error
evs, err = mevents.Consume(topic,
mevents.WithAutoAck(false, 30*time.Second),
mevents.WithRetryLimit(10),
mevents.WithGroup(fmt.Sprintf("%s-%s", "balance", topic))) // 10 retries * 30 secs ackWait gives us 5 mins of tolerance for issues
if err == nil {
handler(evs)
start = time.Now()
continue // if for some reason evs closes we loop and try subscribing again
}
// TODO fix me
if time.Since(start) > 2*time.Minute {
logger.Fatalf("Failed to subscribe to topic %s: %s", topic, err)
}
logger.Warnf("Unable to subscribe to topic %s. Will retry in 20 secs. %s", topic, err)
time.Sleep(20 * time.Second)
}
}
go processTopic("v1api", b.processV1apiEvents)
go processTopic("stripe", b.processStripeEvents)
go pevents.ProcessTopic("v1api", "balance", b.processV1apiEvents)
go pevents.ProcessTopic("stripe", "balance", b.processStripeEvents)
}

func (b *Balance) processV1apiEvents(ch <-chan mevents.Event) {
logger.Infof("Starting to process v1api events")
for {
t := time.NewTimer(600 * time.Minute)
var ev mevents.Event
select {
case ev = <-ch:
t.Stop()
if len(ev.ID) == 0 {
// channel closed
logger.Infof("Channel closed, retrying stream connection")
return
}
case <-t.C:
// safety net in case we stop receiving messages for some reason
logger.Infof("No messages received for last 2 minutes retrying connection")
return
func (b *Balance) processV1apiEvents(ev mevents.Event) error {
ve := &v1api.Event{}
if err := json.Unmarshal(ev.Payload, ve); err != nil {
logger.Errorf("Error unmarshalling v1api event: $s", err)
return nil
}
switch ve.Type {
case "APIKeyCreate":
if err := b.processAPIKeyCreated(ve.ApiKeyCreate); err != nil {
logger.Errorf("Error processing API key created event %s", err)
return err
}

ve := &v1api.Event{}
if err := json.Unmarshal(ev.Payload, ve); err != nil {
ev.Nack()
logger.Errorf("Error unmarshalling v1api event: $s", err)
continue
case "Request":
if err := b.processRequest(ve.Request); err != nil {
logger.Errorf("Error processing request event %s", err)
return err
}
switch ve.Type {
case "APIKeyCreate":
if err := b.processAPIKeyCreated(ve.ApiKeyCreate); err != nil {
ev.Nack()
logger.Errorf("Error processing API key created event %s", err)
continue
}
case "Request":
if err := b.processRequest(ve.Request); err != nil {
ev.Nack()
logger.Errorf("Error processing request event %s", err)
continue
}
default:
logger.Infof("Unrecognised event %+v", ve)
default:
logger.Infof("Unrecognised event %+v", ve)

}
ev.Ack()
}
return nil

}

func (b *Balance) processAPIKeyCreated(ac *v1api.APIKeyCreateEvent) error {
Expand Down Expand Up @@ -151,6 +108,14 @@ func (b *Balance) processRequest(rqe *v1api.RequestEvent) error {
return nil
}

evt := pb.Event{
Type: pb.EventType_EventTypeZeroBalance,
CustomerId: rqe.UserId,
}
if err := events.Publish(pb.EventsTopic, &evt); err != nil {
logger.Errorf("Error publishing event %+v", evt)
}

// no more money, cut them off
if _, err := b.v1Svc.BlockKey(context.TODO(), &v1api.BlockKeyRequest{
UserId: rqe.UserId,
Expand All @@ -168,45 +133,25 @@ func (b *Balance) processRequest(rqe *v1api.RequestEvent) error {
return nil
}

func (b *Balance) processStripeEvents(ch <-chan mevents.Event) {
logger.Infof("Starting to process stripe events")
for {
t := time.NewTimer(600 * time.Minute)
var ev mevents.Event
select {
case ev = <-ch:
t.Stop()
if len(ev.ID) == 0 {
// channel closed
logger.Infof("Channel closed, retrying stream connection")
return
}
case <-t.C:
// safety net in case we stop receiving messages for some reason
logger.Infof("No messages received for last 2 minutes retrying connection")
return
}

ve := &stripepb.Event{}
logger.Infof("Processing event %+v", ev)
if err := json.Unmarshal(ev.Payload, ve); err != nil {
ev.Nack()
logger.Errorf("Error unmarshalling stripe event: $s", err)
continue
func (b *Balance) processStripeEvents(ev mevents.Event) error {
ve := &stripepb.Event{}
logger.Infof("Processing event %+v", ev)
if err := json.Unmarshal(ev.Payload, ve); err != nil {
logger.Errorf("Error unmarshalling stripe event: $s", err)
return nil
}
switch ve.Type {
case "ChargeSucceeded":
if err := b.processChargeSucceeded(ve.ChargeSucceeded); err != nil {
logger.Errorf("Error processing charge succeeded event %s", err)
return err
}
switch ve.Type {
case "ChargeSucceeded":
if err := b.processChargeSucceeded(ve.ChargeSucceeded); err != nil {
ev.Nack()
logger.Errorf("Error processing charge succeeded event %s", err)
continue
}
default:
logger.Infof("Unrecognised event %+v", ve)
default:
logger.Infof("Unrecognised event %+v", ve)

}
ev.Ack()
}
return nil

}

func (b *Balance) processChargeSucceeded(ev *stripepb.ChargeSuceededEvent) error {
Expand All @@ -227,26 +172,32 @@ func (b *Balance) processChargeSucceeded(ev *stripepb.ChargeSuceededEvent) error
return err
}

err = storeAdjustment(ev.CustomerId, ev.Amount*100, ev.CustomerId, "Funds added", true, map[string]string{
adj, err := storeAdjustment(ev.CustomerId, ev.Amount*100, ev.CustomerId, "Funds added", true, map[string]string{
"receipt_url": srsp.Payment.ReceiptUrl,
})
if err != nil {
return err
}

// For now, builders have accounts issued by non micro namespace
rsp, err := b.nsSvc.List(context.Background(), &ns.ListRequest{
Owner: ev.CustomerId,
})
evt := pb.Event{
Type: pb.EventType_EventTypeIncrement,
Adjustment: &pb.Adjustment{
Id: adj.ID,
Created: adj.Created.Unix(),
Delta: adj.Amount,
Reference: adj.Reference,
Meta: adj.Meta,
},
CustomerId: adj.CustomerID,
}
if err := events.Publish(pb.EventsTopic, &evt); err != nil {
logger.Errorf("Error publishing event %+v", evt)
}

namespace := microNamespace
if err == nil {
// TODO at some point builders will actually have accounts issued from micro namespace
namespace = rsp.Namespaces[0].Id
}

// unblock key
if _, err := b.v1Svc.UnblockKey(context.TODO(), &v1api.UnblockKeyRequest{
if _, err := b.v1Svc.UnblockKey(context.Background(), &v1api.UnblockKeyRequest{
UserId: ev.CustomerId,
Namespace: namespace,
}, client.WithAuthToken()); err != nil {
Expand Down
Loading

0 comments on commit 154be25

Please sign in to comment.