Skip to content

Commit

Permalink
use hard coded ha
Browse files Browse the repository at this point in the history
  • Loading branch information
Diego Balduini authored and Diego Balduini committed Apr 14, 2020
1 parent 468e2b6 commit 34a64e9
Show file tree
Hide file tree
Showing 15 changed files with 266 additions and 241 deletions.
29 changes: 13 additions & 16 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,35 @@ services:
darko-master:
image: darko:latest
env_file:
- compose.env
- master.env
ports:
- "8888:8888"
environment:
- DARKO_HA_MASTER_NODE=true
- DARKO_HA_MASTER_PORT=8888
depends_on:
- redis

darko-follower-1:
darko-follower-0:
image: darko:latest
environment:
- RATE_LIMIT=33
env_file:
- compose.env
- follower.env
environment:
- DARKO_RATE_LIMIT_SHARD=0
depends_on:
- redis

darko-follower-2:
darko-follower-1:
image: darko:latest
environment:
- RATE_LIMIT=33
env_file:
- compose.env
- follower.env
environment:
- DARKO_RATE_LIMIT_SHARD=1
depends_on:
- redis

darko-follower-3:
darko-follower-2:
image: darko:latest
environment:
- RATE_LIMIT=33
env_file:
- compose.env
- follower.env
environment:
- DARKO_RATE_LIMIT_SHARD=2
depends_on:
- redis
37 changes: 21 additions & 16 deletions dotenv/dotenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@ var (
quoteSurroundedRegexp = regexp.MustCompile(`^['"]|['"]$`)
)

type Env string

func (e Env) Int() int {
i, err := strconv.Atoi(string(e))
if err != nil {
panic(err)
}
return i
}

func (e Env) String() string {
return string(e)
}

// LoadEnvFromConfigFile loads dotenv from "CONFIG_FILE" environment variable if it is set.
func LoadEnvFromConfigFile() error {
if f, ok := os.LookupEnv("CONFIG_FILE"); ok {
Expand Down Expand Up @@ -84,31 +98,22 @@ func Validate(vars []string) (string, bool) {
}

// GetOrElse returns the value for the environment value or the default value.
func GetOrElse(key, defaultValue string) string {
func GetOrElse(key, defaultValue string) Env {
if v, ok := os.LookupEnv(key); ok {
return v
return Env(v)
}
return defaultValue
return Env(defaultValue)
}

// Get returns the value for the key if present or panic otherwise
func Get(key string) string {
// MustGet returns the value for the key if present or panic otherwise
func MustGet(key string) Env {
if v, ok := os.LookupEnv(key); ok {
return v
return Env(v)
}
panic(1)
}

// GetInt returns the value for the key as int if present or panic otherwise
func GetInt(key string) int {
i, err := strconv.Atoi(Get(key))
if err != nil {
panic(err)
}
return i
}

// GetBool returns the value for the key as bool if present or false otherwise
func GetBool(key string) bool {
return Get(key) == "true"
return GetOrElse(key, "false") == "true"
}
11 changes: 3 additions & 8 deletions fifo/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package fifo

import (
"errors"
"github.com/dbalduini/darko/shard"
)

var (
Expand All @@ -12,17 +11,13 @@ var (
TopicJobsNew = "darko:jobs:new"
TopicJobsProcessed = "darko:jobs:processed"
TopicJobsFailed = "darko:jobs:failed"
TopicShardsNode = "darko:shards:node"

)

// Queue defines a FIFO queue interface
type Queue interface {
// Pop dequeue the next item of the topic.
Pop(topic string) (shard.Job, error)
Pop(topic string) (string, error)
// Push enqueues the item,
Push(topic string, p shard.Job) error

AddShardNode() (int, error)

ShardCount() (int, error)
Push(topic string, pack string) error
}
61 changes: 61 additions & 0 deletions fifo/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package fifo

import (
"bytes"
"encoding/base64"
"encoding/gob"
"hash/fnv"
"io"
"strings"
)

// Job holds data for creating jobs.
// A job defines something that will be queued and dequeued. In HA mode, a job
// will also be uniform partitioned on shards.
type Job struct {
// ID is the job unique id. Set by the server.
ID string
// PartitionKey is the partition id this job belongs to. Set by the server.
PartitionKey int
// PrimaryKey is the primary key.
// It is used to guarantee the order of the requests and balance the work load.
PrimaryKey string `json:"primary_key"`
// CallbackURL is the service endpoint to send the payload back to.
CallbackURL string `json:"callback_url"`
// CorrelationID is a unique identifier attached to the request by the client that allow
// reference to a particular transaction or event chain.
CorrelationID string `json:"correlation_id"`
// Payload with the content encoded as base64 that will be forwarded on the client on the callback.
Payload string `json:"payload"`
}

// Hash returns hash code for the primary key
func (j Job) Hash() int {
h := fnv.New32a()
h.Write([]byte(j.PrimaryKey))
return int(h.Sum32())
}

func (j Job) NewPayloadReader() io.Reader {
return strings.NewReader(j.Payload)
}

// Unpack the job from the queue after dequeue.
func Unpack(pack string, job *Job) error {
buf, err := base64.StdEncoding.DecodeString(pack)
if err != nil {
return err
}
d := gob.NewDecoder(bytes.NewBuffer(buf))
return d.Decode(job)
}

// Pack the job as base64 string to be enqueued
func Pack(job Job) (string, error) {
var buf bytes.Buffer
e := gob.NewEncoder(&buf)
if err := e.Encode(job); err != nil {
return "", err
}
return base64.StdEncoding.EncodeToString(buf.Bytes()), nil
}
29 changes: 6 additions & 23 deletions fifo/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"time"

"github.com/go-redis/redis"

"github.com/dbalduini/darko/shard"
)

// RedisQueue implements the Queue interface using Redis database as queue.
Expand All @@ -22,37 +20,22 @@ func NewRedisQueue(cli *redis.Client, blockTimeout time.Duration) *RedisQueue {
// Pop returns the left most item of the list.
// The command BLPop is used to block the queue when it is empty until the timeout.
// The item must be added by the producer with RPUSH.
func (r *RedisQueue) Pop(topic string) (shard.Job, error) {
job := shard.Job{}
func (r *RedisQueue) Pop(topic string) (string, error) {
res, err := r.cli.BLPop(r.blockTimeout, topic).Result()
if err == redis.Nil {
return job, ErrEmptyQueue
return "", ErrEmptyQueue
}
if err != nil {
return job, err
return "", err
}
// the zero index contains the list name
s := res[1]
err = shard.Unpack(s, &job)
return job, err
return res[1], err
}

// Push enqueues the item pushing it on the right side of the list.
// This is it can be dequeue my LPOP or BLPop in FIFO order.
func (r *RedisQueue) Push(topic string, job shard.Job) error {
s := shard.Pack(job)
_, err := r.cli.RPush(topic, s).Result()
func (r *RedisQueue) Push(topic string, pack string) error {
_, err := r.cli.RPush(topic, pack).Result()
return err
}

// AddShardNode register the node and returns the partition key
func (r *RedisQueue) AddShardNode() (int, error) {
p, err := r.cli.Incr(TopicShardsNode).Result()
return int(p), err
}

// AddShardNode register the node and returns the partition key
func (r *RedisQueue) ShardCount() (int, error) {
p, err := r.cli.Get(TopicShardsNode).Int()
return p, err
}
12 changes: 7 additions & 5 deletions compose.env → follower.env
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ REDIS_ADDRESS=redis:6379
REDIS_CON_POOL=100
REDIS_CON_PASSWORD=
REDIS_BLOCK_TIMEOUT_SECONDS=2
# HA

# HA - Follower
DARKO_HA_MODE=true
DARKO_HA_MASTER_NODE=false
# Vertical Scaling
TOTAL_WORKERS=10
# callback with delay simulation
CALLBACK_URL=https://www.mocky.io/v2/5e94bc9131000030ce5e3480?mocky-delay=300ms

# Rate Limiter
DARKO_RATE_LIMIT_POINTS=33
DARKO_RATE_LIMIT_WORKERS_COUNT=10
DARKO_RATE_LIMIT_SHARD=will be set
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ go 1.14

require (
github.com/go-redis/redis v6.15.7+incompatible
github.com/satori/go.uuid v1.2.0
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
github.com/go-redis/redis v6.15.7+incompatible h1:3skhDh95XQMpnqeqNftPkQD9jL9e5e36z/1SUm6dy1U=
github.com/go-redis/redis v6.15.7+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
20 changes: 5 additions & 15 deletions http/callback.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
package http

import (
"bytes"
"crypto/tls"
"encoding/json"
"github.com/dbalduini/darko/shard"
"io"
"io/ioutil"
"net/http"
"os"
"time"
)

// transport is tuned for single host connection pool
var transport = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
MaxIdleConns: 100,
Expand All @@ -23,24 +20,17 @@ var transport = &http.Transport{
var tuneHTTPClient = &http.Client{Transport: transport}

// PostCallback calls the callback url with the payload
func PostCallback(job shard.Job) (int, error) {
url := os.Getenv("CALLBACK_URL")

buf, err := json.Marshal(&job)
if err != nil {
return -1, err
}

req, err := http.NewRequest("POST", url, bytes.NewBuffer(buf))
func PostCallback(url string, body io.Reader) (int, error) {
req, err := http.NewRequest("POST", url, body)
if err != nil {
return -1, err
return 0, err
}

req.Header.Add("Content-Type", "application/json")

res, err := tuneHTTPClient.Do(req)
if err != nil {
return -1, err
return 0, err
}

defer res.Body.Close()
Expand Down
Loading

0 comments on commit 34a64e9

Please sign in to comment.