Skip to content

Commit

Permalink
register workflows dynamically.
Browse files Browse the repository at this point in the history
  • Loading branch information
danli001 authored and s8sg committed Jul 29, 2023
1 parent e78fa2f commit 71a0238
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 58 deletions.
157 changes: 99 additions & 58 deletions runtime/flow_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log"
"net/http"
"reflect"
"time"

"github.com/adjust/rmq/v4"
Expand Down Expand Up @@ -42,9 +43,10 @@ type FlowRuntime struct {

eventHandler sdk.EventHandler

taskQueues map[string]rmq.Queue
srv *http.Server
rdb *redis.Client
taskQueues map[string]rmq.Queue
srv *http.Server
rdb *redis.Client
rmqConnection rmq.Connection
}

type Worker struct {
Expand Down Expand Up @@ -131,6 +133,31 @@ func (fRuntime *FlowRuntime) CreateExecutor(req *runtime.Request) (executor.Exec
return ex, err
}

func (fRuntime *FlowRuntime) AppendFlows(flows map[string]FlowDefinitionHandler) error {

if reflect.ValueOf(fRuntime.rmqConnection).IsNil() {
return fmt.Errorf("unable to append flows, queue worker not started")
}

for flowName := range flows {
if _, ok := fRuntime.Flows[flowName]; ok {
return fmt.Errorf("flow %s already registered", flowName)
}
}

// register flows to runtime
for flowName, flowHandler := range flows {
fRuntime.Flows[flowName] = flowHandler
}

err := fRuntime.initializeTaskQueues(&fRuntime.rmqConnection, flows)
if err != nil {
return fmt.Errorf("failed to initialize task queues, error %v", err)
}

return nil
}

// OpenConnection opens and returns a new connection
func OpenConnectionV2(tag string, network string, address string, password string, db int, errChan chan<- error) (rmq.Connection, error) {
redisClient := redis2.NewClient(&redis2.Options{Network: network, Addr: address, Password: password, DB: db})
Expand Down Expand Up @@ -262,70 +289,21 @@ func (fRuntime *FlowRuntime) StopServer() error {

// StartQueueWorker starts listening for request in queue
func (fRuntime *FlowRuntime) StartQueueWorker(errorChan chan error) error {
connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, 0, nil)
connection, err := rmq.OpenConnection("goflow", "tcp", fRuntime.RedisURL, 0, nil)
if err != nil {
return fmt.Errorf("failed to initiate connection, error %v", err)
}
fRuntime.rmqConnection = connection

fRuntime.taskQueues = make(map[string]rmq.Queue)
for flowName := range fRuntime.Flows {
taskQueue, err := connection.OpenQueue(fRuntime.internalRequestQueueId(flowName))
if err != nil {
return fmt.Errorf("failed to open queue, error %v", err)
}

var pushQueues = make([]rmq.Queue, fRuntime.RetryQueueCount)
var previousQueue = taskQueue

index := 0
for index < fRuntime.RetryQueueCount {
pushQueues[index], err = connection.OpenQueue(fRuntime.internalRequestQueueId(flowName) + "push-" + fmt.Sprint(index))
if err != nil {
return fmt.Errorf("failed to open push queue, error %v", err)
}
previousQueue.SetPushQueue(pushQueues[index])
previousQueue = pushQueues[index]
index++
}

err = taskQueue.StartConsuming(10, time.Second)
if err != nil {
return fmt.Errorf("failed to start consumer taskQueue, error %v", err)
}
fRuntime.taskQueues[flowName] = taskQueue

index = 0
for index < fRuntime.RetryQueueCount {
err = pushQueues[index].StartConsuming(10, time.Second)
if err != nil {
return fmt.Errorf("failed to start consumer pushQ1, error %v", err)
}
index++
}

index = 0
for index < fRuntime.Concurrency {
_, err := taskQueue.AddConsumer(fmt.Sprintf("request-consumer-%d", index), fRuntime)
if err != nil {
return fmt.Errorf("failed to add consumer, error %v", err)
}
index++
}

index = 0
for index < fRuntime.RetryQueueCount {
_, err = pushQueues[index].AddConsumer(fmt.Sprintf("request-consumer-%d", index), fRuntime)
if err != nil {
return fmt.Errorf("failed to add consumer, error %v", err)
}
index++
}
err = fRuntime.initializeTaskQueues(&connection, fRuntime.Flows)
if err != nil {
return fmt.Errorf("failed to initiate task queues, error %v", err)
}

fRuntime.Logger.Log("[goflow] queue worker started successfully")

err = <-errorChan
<-connection.StopAllConsuming()
<-fRuntime.rmqConnection.StopAllConsuming()
return err
}

Expand Down Expand Up @@ -520,6 +498,69 @@ func (fRuntime *FlowRuntime) handleStopRequest(request *runtime.Request) error {
return nil
}

func (fRuntime *FlowRuntime) initializeTaskQueues(connection *rmq.Connection, flows map[string]FlowDefinitionHandler) error {

if fRuntime.taskQueues == nil {
fRuntime.taskQueues = make(map[string]rmq.Queue)
}

for flowName := range flows {
taskQueue, err := (*connection).OpenQueue(fRuntime.internalRequestQueueId(flowName))
if err != nil {
return fmt.Errorf("failed to open queue, error %v", err)
}

var pushQueues = make([]rmq.Queue, fRuntime.RetryQueueCount)
var previousQueue = taskQueue

index := 0
for index < fRuntime.RetryQueueCount {
pushQueues[index], err = (*connection).OpenQueue(fRuntime.internalRequestQueueId(flowName) + "push-" + fmt.Sprint(index))
if err != nil {
return fmt.Errorf("failed to open push queue, error %v", err)
}
previousQueue.SetPushQueue(pushQueues[index])
previousQueue = pushQueues[index]
index++
}

err = taskQueue.StartConsuming(10, time.Second)
if err != nil {
return fmt.Errorf("failed to start consumer taskQueue, error %v", err)
}
fRuntime.taskQueues[flowName] = taskQueue

index = 0
for index < fRuntime.RetryQueueCount {
err = pushQueues[index].StartConsuming(10, time.Second)
if err != nil {
return fmt.Errorf("failed to start consumer pushQ1, error %v", err)
}
index++
}

index = 0
for index < fRuntime.Concurrency {
_, err := taskQueue.AddConsumer(fmt.Sprintf("request-consumer-%d", index), fRuntime)
if err != nil {
return fmt.Errorf("failed to add consumer, error %v", err)
}
index++
}

index = 0
for index < fRuntime.RetryQueueCount {
_, err = pushQueues[index].AddConsumer(fmt.Sprintf("request-consumer-%d", index), fRuntime)
if err != nil {
return fmt.Errorf("failed to add consumer, error %v", err)
}
index++
}
}

return nil
}

func (fRuntime *FlowRuntime) internalRequestQueueId(flowName string) string {
return fmt.Sprintf("%s:%s", InternalRequestQueueInitial, flowName)
}
Expand Down
9 changes: 9 additions & 0 deletions v1/goflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,15 @@ func (fs *FlowService) Register(flowName string, handler runtime.FlowDefinitionH
return nil
}

func (fs *FlowService) AppendFlows(flows map[string]runtime.FlowDefinitionHandler) error {
err := fs.runtime.AppendFlows(flows)
if err != nil {
return fmt.Errorf("failed to append flows: %s", err)
}

return nil
}

func (fs *FlowService) Start() error {
if len(fs.Flows) == 0 {
return fmt.Errorf("must register atleast one flow")
Expand Down

0 comments on commit 71a0238

Please sign in to comment.