Skip to content

Commit

Permalink
Plumb config - Latest (gomods#627)
Browse files Browse the repository at this point in the history
* switch proxy to config file

pull in single flight changes

* changes for single-flight

* intermediate stage. All tests passing. pkg still has env refs

* remove all env references

* delete config/env entirely

* fix failing tests

* create the config.toml file as part of dev setup

* create config file only if it doesn't exist

* update Dockerfiles to use config file

* move composing elements to the top

* verbose parameter naming

* newline

* add flag for config file path

* update docs with config file flag

* remove unnecessary nil check

* use filepath.join

* rename redis port to address

* fix path.join

* fix issues after merge

* add vendor dir
  • Loading branch information
rohancme authored and arschles committed Sep 11, 2018
1 parent bb25043 commit 0e470d0
Show file tree
Hide file tree
Showing 207 changed files with 3,395 additions and 19,363 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ cmd/olympus/bin
cmd/proxy/bin
.idea
.DS_Store


# prod config file
config.toml
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ alldeps:
dev:
docker-compose -p athensdev up -d mongo
docker-compose -p athensdev up -d redis
./scripts/create_default_config.sh

.PHONY: down
down:
Expand Down
4 changes: 2 additions & 2 deletions charts/proxy/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ spec:
value: {{ .Values.storage.mongo.url | quote }}
{{- end }}
# TODO: re-enable when workers are used
#- name: ATHENS_REDIS_QUEUE_PORT
# value: {{ .Values.redisPort | quote }}
#- name: ATHENS_REDIS_QUEUE_ADDRESS
# value: {{ .Values.redis.address | quote }}
ports:
- containerPort: 3000
{{- if eq .Values.storage.type "disk" }}
Expand Down
2 changes: 1 addition & 1 deletion charts/proxy/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ storage:
# Worker is disabled right now. When it's turned on, we can use helm dependencies to start this up!
#redis:
# useEmbedded: false
# port: 6379
# address: ":6379"
# username:
# password:
# host:
5 changes: 4 additions & 1 deletion cmd/olympus/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ WORKDIR $GOPATH/src/github.com/gomods/athens
ADD . .

RUN cd cmd/olympus && buffalo build -s -o /bin/app
RUN scripts/create_default_config.sh
COPY config.toml /bin/config.toml

FROM alpine
RUN apk add --no-cache bash
Expand All @@ -25,6 +27,7 @@ RUN apk add --no-cache ca-certificates
WORKDIR /bin/

COPY --from=builder /bin/app .
COPY --from=builder /bin/config.toml .

# Comment out to run the binary in "production" mode:
# ENV GO_ENV=production
Expand All @@ -36,4 +39,4 @@ EXPOSE 3000

# Comment out to run the migrations before running the binary:
# CMD /bin/app migrate; /bin/app
CMD exec /bin/app
CMD exec /bin/app -config_file=config.toml
26 changes: 9 additions & 17 deletions cmd/olympus/actions/actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,32 @@ package actions

import (
"encoding/json"
"path/filepath"
"testing"
"time"

"github.com/gobuffalo/gocraft-work-adapter"
"github.com/gobuffalo/suite"
"github.com/gocraft/work"
"github.com/gomods/athens/pkg/config/env"
"github.com/gomods/athens/pkg/config"
"github.com/gomods/athens/pkg/eventlog"
"github.com/gomods/athens/pkg/eventlog/mongo"
"github.com/gomods/athens/pkg/payloads"
"github.com/gomods/athens/pkg/storage/mem"
)

var (
testConfigFile = filepath.Join("..", "..", "..", "config.test.toml")
)

type ActionSuite struct {
*suite.Action
}

func Test_ActionSuite(t *testing.T) {
stg, err := mem.NewStorage()
conf := config.GetConfLogErr(testConfigFile, t)
app, err := App(conf)
if err != nil {
t.Fatalf("error creating storage (%s)", err)
}
mURI := env.MongoConnectionString()
certPath := env.MongoCertPath()
eLog, err := mongo.NewLog(mURI, certPath)
if err != nil {
t.Fatalf("error creating event log (%s)", err)
}
config := AppConfig{
Storage: stg,
EventLog: eLog,
CacheMissesLog: eLog,
t.Fatalf("Failed to initialize app: %s", err)
}
app, err := App(&config)
as := &ActionSuite{suite.NewAction(app)}
suite.Run(t, as)
}
Expand Down
101 changes: 63 additions & 38 deletions cmd/olympus/actions/app.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package actions

import (
"fmt"
stdlog "log"
"time"

"github.com/gobuffalo/buffalo"
"github.com/gobuffalo/buffalo/middleware"
Expand All @@ -13,7 +15,7 @@ import (
"github.com/gobuffalo/packr"
"github.com/gocraft/work"
"github.com/gomods/athens/pkg/cdn/metadata/azurecdn"
"github.com/gomods/athens/pkg/config/env"
"github.com/gomods/athens/pkg/config"
"github.com/gomods/athens/pkg/download"
"github.com/gomods/athens/pkg/eventlog"
"github.com/gomods/athens/pkg/log"
Expand All @@ -22,15 +24,19 @@ import (
"github.com/gomods/athens/pkg/storage"
"github.com/gomodule/redigo/redis"
"github.com/rs/cors"
"github.com/sirupsen/logrus"
"github.com/spf13/afero"
"github.com/unrolled/secure"
)

// AppConfig contains dependencies used in App
type AppConfig struct {
Storage storage.Backend
EventLog eventlog.Eventlog
CacheMissesLog eventlog.Appender
type workerConfig struct {
store storage.Backend
eLog eventlog.Eventlog
wType string
redisEndpoint string
maxConc int
maxFails uint
downloadTimeout time.Duration
}

const (
Expand All @@ -47,9 +53,6 @@ var (
workerModuleKey = "module"
workerVersionKey = "version"
workerPushNotificationKey = "push-notification"
// ENV is used to help switch settings based on where the
// application is being run. Default is "development".
ENV = env.GoEnvironmentWithDefault("development")
// T is buffalo Translator
T *i18n.Translator
)
Expand All @@ -59,25 +62,48 @@ const Service = "olympus"

// App is where all routes and middleware for buffalo should be defined.
// This is the nerve center of your application.
func App(config *AppConfig) (*buffalo.App, error) {
port := env.Port(":3001")
func App(conf *config.Config) (*buffalo.App, error) {
// ENV is used to help switch settings based on where the
// application is being run. Default is "development".
ENV := conf.GoEnv
port := conf.Olympus.Port

w, err := getWorker(config.Storage, config.EventLog)
storage, err := GetStorage(conf.Olympus.StorageType, conf.Storage)
if err != nil {
return nil, err
}
if conf.Storage == nil || conf.Storage.Mongo == nil {
return nil, fmt.Errorf("A valid Mongo configuration is required to create the event log")
}
eLog, err := GetEventLog(conf.Storage.Mongo.URL, conf.Storage.Mongo.CertPath, conf.Storage.Mongo.TimeoutDuration())
if err != nil {
return nil, fmt.Errorf("error creating eventlog (%s)", err)
}
wConf := workerConfig{
store: storage,
eLog: eLog,
wType: conf.Olympus.WorkerType,
maxConc: conf.MaxConcurrency,
maxFails: conf.MaxWorkerFails,
downloadTimeout: conf.TimeoutDuration(),
redisEndpoint: conf.Olympus.RedisQueueAddress,
}
w, err := getWorker(wConf)
if err != nil {
return nil, err
}

lvl, err := env.LogLevel()
logLvl, err := logrus.ParseLevel(conf.LogLevel)
if err != nil {
return nil, err
}
lggr := log.New(env.CloudRuntime(), lvl)
lggr := log.New(conf.CloudRuntime, logLvl)

blvl, err := env.BuffaloLogLevel()
bLogLvl, err := logrus.ParseLevel(conf.BuffaloLogLevel)
if err != nil {
return nil, err
}
blggr := log.Buffalo(blvl)
blggr := log.Buffalo(bLogLvl)

app := buffalo.New(buffalo.Options{
Addr: port,
Expand All @@ -102,7 +128,7 @@ func App(config *AppConfig) (*buffalo.App, error) {
}
// Protect against CSRF attacks. https://www.owasp.org/index.php/Cross-Site_Request_Forgery_(CSRF)
// Remove to disable this.
if env.EnableCSRFProtection() {
if conf.EnableCSRFProtection {
csrfMiddleware := csrf.New
app.Use(csrfMiddleware)
}
Expand All @@ -120,23 +146,23 @@ func App(config *AppConfig) (*buffalo.App, error) {
}
app.Use(T.Middleware())

app.GET("/diff/{lastID}", diffHandler(config.Storage, config.EventLog))
app.GET("/feed/{lastID}", feedHandler(config.Storage))
app.GET("/eventlog/{sequence_id}", eventlogHandler(config.EventLog))
app.GET("/diff/{lastID}", diffHandler(storage, eLog))
app.GET("/feed/{lastID}", feedHandler(storage))
app.GET("/eventlog/{sequence_id}", eventlogHandler(eLog))
app.POST("/cachemiss", cachemissHandler(w))
app.POST("/push", pushNotificationHandler(w))
app.GET("/healthz", healthHandler)

// Download Protocol
goBin := env.GoBinPath()
goBin := conf.GoBinary
fs := afero.NewOsFs()
mf, err := module.NewGoGetFetcher(goBin, fs)
if err != nil {
return nil, err
}
st := stash.New(mf, config.Storage)
st := stash.New(mf, storage)
dpOpts := &download.Opts{
Storage: config.Storage,
Storage: storage,
Stasher: st,
GoBinPath: goBin,
Fs: fs,
Expand All @@ -151,46 +177,45 @@ func App(config *AppConfig) (*buffalo.App, error) {
return app, nil
}

func getWorker(store storage.Backend, eLog eventlog.Eventlog) (worker.Worker, error) {
workerType := env.OlympusBackgroundWorkerType()
switch workerType {
func getWorker(wConf workerConfig) (worker.Worker, error) {
switch wConf.wType {
case "redis":
return registerRedis(store, eLog)
return registerRedis(wConf)
case "memory":
return registerInMem(store, eLog)
return registerInMem(wConf)
default:
stdlog.Printf("Provided background worker type %s. Expected redis|memory. Defaulting to memory", workerType)
return registerInMem(store, eLog)
stdlog.Printf("Provided background worker type %s. Expected redis|memory. Defaulting to memory", wConf.wType)
return registerInMem(wConf)
}
}

func registerInMem(store storage.Backend, eLog eventlog.Eventlog) (worker.Worker, error) {
func registerInMem(wConf workerConfig) (worker.Worker, error) {
w := worker.NewSimple()
if err := w.Register(PushNotificationHandlerName, GetProcessPushNotificationJob(store, eLog)); err != nil {
if err := w.Register(PushNotificationHandlerName, GetProcessPushNotificationJob(wConf.store, wConf.eLog, wConf.downloadTimeout)); err != nil {
return nil, err
}
return w, nil
}

func registerRedis(store storage.Backend, eLog eventlog.Eventlog) (worker.Worker, error) {
port := env.OlympusRedisQueuePortWithDefault(":6379")
func registerRedis(wConf workerConfig) (worker.Worker, error) {
addr := wConf.redisEndpoint
w := gwa.New(gwa.Options{
Pool: &redis.Pool{
MaxActive: 5,
MaxIdle: 5,
Wait: true,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", port)
return redis.Dial("tcp", addr)
},
},
Name: OlympusWorkerName,
MaxConcurrency: env.AthensMaxConcurrency(),
MaxConcurrency: wConf.maxConc,
})

opts := work.JobOptions{
SkipDead: true,
MaxFails: env.WorkerMaxFails(),
MaxFails: wConf.maxFails,
}

return w, w.RegisterWithOptions(PushNotificationHandlerName, opts, GetProcessPushNotificationJob(store, eLog))
return w, w.RegisterWithOptions(PushNotificationHandlerName, opts, GetProcessPushNotificationJob(wConf.store, wConf.eLog, wConf.downloadTimeout))
}
28 changes: 17 additions & 11 deletions cmd/olympus/actions/eventlog.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
package actions

import (
"github.com/gomods/athens/pkg/config/env"
"time"

"github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/eventlog"
"github.com/gomods/athens/pkg/eventlog/mongo"
)

// GetEventLog returns implementation of eventlog.EventLog
func GetEventLog() (eventlog.Eventlog, error) {
connectionString := env.MongoConnectionString()
certPath := env.MongoCertPath()
l, err := mongo.NewLog(connectionString, certPath)
return l, err
func GetEventLog(mongoURL string, certPath string, timeout time.Duration) (eventlog.Eventlog, error) {
const op = "actions.GetEventLog"
l, err := mongo.NewLog(mongoURL, certPath, timeout)
if err != nil {
return nil, errors.E(op, err)
}
return l, nil
}

// NewCacheMissesLog returns impl. of eventlog.Appender
func NewCacheMissesLog() (eventlog.Appender, error) {
connectionString := env.MongoConnectionString()
certPath := env.MongoCertPath()
l, err := mongo.NewLogWithCollection(connectionString, certPath, "cachemisseslog")
return l, err
func NewCacheMissesLog(mongoURL string, certPath string, timeout time.Duration) (eventlog.Appender, error) {
const op = "actions.NewCacheMissesLog"
l, err := mongo.NewLogWithCollection(mongoURL, certPath, "cachemisseslog", timeout)
if err != nil {
return nil, errors.E(op, err)
}
return l, nil
}
9 changes: 4 additions & 5 deletions cmd/olympus/actions/merge_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"log"
"time"

"github.com/gomods/athens/pkg/config/env"
"github.com/gomods/athens/pkg/eventlog"
"github.com/gomods/athens/pkg/module"
"github.com/gomods/athens/pkg/storage"
Expand All @@ -23,10 +22,10 @@ import (
// - Delete operation adds tombstone to module metadata k/v store
//
// Both could be fixed by putting each 'for' loop into a (global) critical section
func mergeDB(ctx context.Context, originURL string, diff dbDiff, eLog eventlog.Eventlog, storage storage.Backend, downloader module.Downloader) error {
func mergeDB(ctx context.Context, originURL string, diff dbDiff, eLog eventlog.Eventlog, storage storage.Backend, downloader module.Downloader, downloadTimeout time.Duration) error {
var errors error
for _, added := range diff.Added {
if err := add(ctx, added, originURL, eLog, storage, downloader); err != nil {
if err := add(ctx, added, originURL, eLog, storage, downloader, downloadTimeout); err != nil {
errors = multierror.Append(errors, err)
}
}
Expand All @@ -43,15 +42,15 @@ func mergeDB(ctx context.Context, originURL string, diff dbDiff, eLog eventlog.E
return errors
}

func add(ctx context.Context, event eventlog.Event, originURL string, eLog eventlog.Eventlog, storage storage.Backend, downloader module.Downloader) error {
func add(ctx context.Context, event eventlog.Event, originURL string, eLog eventlog.Eventlog, storage storage.Backend, downloader module.Downloader, downloadTimeout time.Duration) error {
if _, err := eLog.ReadSingle(event.Module, event.Version); err != nil {
// the module/version already exists, is deprecated, or is
// tombstoned, so nothing to do
return err
}

// download code from the origin
data, err := downloader(ctx, env.Timeout(), originURL, event.Module, event.Version)
data, err := downloader(ctx, downloadTimeout, originURL, event.Module, event.Version)
if err != nil {
log.Printf("error downloading new module %s/%s from %s (%s)", event.Module, event.Version, originURL, err)
return err
Expand Down
Loading

0 comments on commit 0e470d0

Please sign in to comment.