Skip to content

Commit

Permalink
More scheduling work. Now schedules jobs, observes kills and restarts…
Browse files Browse the repository at this point in the history
… `desired_instances` also supported (but not bug free)
  • Loading branch information
zefhemel committed Sep 10, 2021
1 parent 4ba03c6 commit 7dcfe7a
Show file tree
Hide file tree
Showing 19 changed files with 477 additions and 132 deletions.
40 changes: 35 additions & 5 deletions cmd/mls/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/zefhemel/matterless/pkg/application"
"github.com/zefhemel/matterless/pkg/client"
"github.com/zefhemel/matterless/pkg/config"
"github.com/zefhemel/matterless/pkg/util"
)

func runCommand() *cobra.Command {
Expand Down Expand Up @@ -78,7 +79,7 @@ func deployCommand() *cobra.Command {
}

cmdDeploy.Flags().BoolVarP(&watch, "watch", "w", false, "watch apps for changes and redeploy")
cmdDeploy.Flags().StringVar(&url, "url", "", "URL or Matterless server to deploy to")
cmdDeploy.Flags().StringVar(&url, "url", "http://localhost:8222", "URL or Matterless server to deploy to")
cmdDeploy.Flags().StringVarP(&adminToken, "token", "t", "", "Root token for Matterless server")

return cmdDeploy
Expand All @@ -98,7 +99,32 @@ func attachCommand() *cobra.Command {
runConsole(mlsClient, []string{})
},
}
cmd.Flags().StringVar(&url, "url", "", "URL or Matterless server to deploy to")
cmd.Flags().StringVar(&url, "url", "http://localhost:8222", "URL of matterless server to connect to")
cmd.Flags().StringVarP(&adminToken, "token", "t", "", "Root token for Matterless server")

return cmd
}

func infoCommand() *cobra.Command {
var (
url string
adminToken string
)
var cmd = &cobra.Command{
Use: "info",
Short: "Retrieve cluster information",
Args: cobra.MinimumNArgs(0),
Run: func(cmd *cobra.Command, args []string) {
mlsClient := client.NewMatterlessClient(url, adminToken)
info, err := mlsClient.ClusterInfo()
if err != nil {
fmt.Printf("Error fetching cluster info: %s\n", err)
return
}
fmt.Println(util.MustJsonString(info))
},
}
cmd.Flags().StringVar(&url, "url", "http://localhost:8222", "URL of matterless server to connect to")
cmd.Flags().StringVarP(&adminToken, "token", "t", "", "Root token for Matterless server")

return cmd
Expand Down Expand Up @@ -127,7 +153,7 @@ func main() {
log.SetLevel(log.DebugLevel)

cmd := rootCommand()
cmd.AddCommand(runCommand(), deployCommand(), attachCommand(), ppCommand())
cmd.AddCommand(runCommand(), deployCommand(), attachCommand(), infoCommand(), ppCommand())
cmd.Execute()
}

Expand All @@ -140,11 +166,15 @@ func runServer(cfg *config.Config) *application.Container {
// Subscribe to all logs and write to stdout
appContainer.ClusterConnection().Subscribe(fmt.Sprintf("%s.*.function.*.log", cfg.ClusterNatsPrefix), func(m *nats.Msg) {
parts := strings.Split(m.Subject, ".") // mls.myapp.function.MyFunction.log
log.Infof("[%s | %s] %s", parts[1], parts[3], string(m.Data))
log.Infof("LOG [%s | %s]: %s", parts[1], parts[3], string(m.Data))
})

if err := appContainer.Start(); err != nil {
log.Fatalf("Could not start container: %s", err)
}

// Handle Ctrl-c gracefully
killing := make(chan os.Signal)
killing := make(chan os.Signal, 1)
signal.Notify(killing, os.Interrupt, syscall.SIGTERM)
go func() {
<-killing
Expand Down
17 changes: 16 additions & 1 deletion pkg/application/admin_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"io"
"net/http"
"time"

"github.com/gorilla/mux"
"github.com/zefhemel/matterless/pkg/definition"
Expand Down Expand Up @@ -38,7 +39,7 @@ func (ag *APIGateway) exposeAdminAPI() {

// Rather than applying this locally, we'll store it just in the store, which in turn will lead to the app
// being loaded
if err := ag.container.Store().Put(fmt.Sprintf("app:%s", appName), string(defBytes)); err != nil {
if err := ag.container.Store().Put(fmt.Sprintf("app:%s", appName), defs); err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, err.Error())
return
Expand All @@ -58,6 +59,20 @@ func (ag *APIGateway) exposeAdminAPI() {
fmt.Fprint(w, util.MustJsonString(ag.container.List()))
}).Methods("GET")

ag.rootRouter.HandleFunc("/_info", func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
if !ag.authAdmin(w, r) {
return
}
info, err := ag.container.ClusterEventBus().FetchClusterInfo(1 * time.Second)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, err.Error())
return
}
fmt.Fprint(w, util.MustJsonString(info))
}).Methods("GET")

ag.rootRouter.HandleFunc("/{app}", func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
vars := mux.Vars(r)
Expand Down
79 changes: 46 additions & 33 deletions pkg/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ type Application struct {
// Definitions
appName string
definitions *definition.Definitions
code string

// Runtime
config *config.Config
eventBus *cluster.ClusterEventBus
eventsSubscription cluster.Subscription
sandbox *sandbox.Sandbox
config *config.Config
eventBus *cluster.ClusterEventBus
eventsSubscription cluster.Subscription
startWorkerSubscription cluster.Subscription
sandbox *sandbox.Sandbox

// API
apiToken string
Expand Down Expand Up @@ -87,6 +87,13 @@ func NewApplication(cfg *config.Config, appName string, s store.Store, ceb *clus
return nil, errors.Wrap(err, "event subscribe")
}

app.startWorkerSubscription, err = app.eventBus.SubscribeRequestJobWorker(func(jobName string) {
job := app.definitions.Jobs[definition.FunctionID(jobName)]
if err := app.sandbox.StartJobWorker(definition.FunctionID(jobName), job.Config, job.Code); err != nil {
log.Errorf("Could not start job %s: %s", jobName, err)
}
})

return app, nil
}

Expand Down Expand Up @@ -114,21 +121,7 @@ func (app *Application) PublishAppEvent(name string, event interface{}) error {
}))
}

func (app *Application) Eval(code string) error {
log.Debug("Parsing and checking definitions...")
app.code = code
defs, err := definition.Parse(code)
if err != nil {
return err
}

if err := defs.InlineImports(fmt.Sprintf("%s/.importcache", app.config.DataDir)); err != nil {
return err
}
if err := defs.ExpandMacros(); err != nil {
return err
}

func (app *Application) Eval(defs *definition.Definitions) error {
app.definitions = defs
defs.InterpolateStoreValues(app.dataStore)

Expand All @@ -143,27 +136,36 @@ func (app *Application) Eval(code string) error {
}
}

log.Info("Loading jobs...")
for name, def := range defs.Jobs {
if err := app.sandbox.LoadJob(string(name), def.Config, def.Code); err != nil {
log.Errorf("Could not spin up job worker for %s: %s", name, err)
}
}
//log.Info("Loading jobs...")
//for name, def := range defs.Jobs {
// if err := app.sandbox.StartJobWorker(name, def.Config, def.Code); err != nil {
// log.Errorf("Could not spin up job worker for %s: %s", name, err)
// }
//}

log.Info("Ready to go.")
return nil
}

func (app *Application) StartJobs() error {
log.Info("Starting jobs...")
for name, def := range app.definitions.Jobs {
if _, err := app.InvokeFunction(string(name), def.Config.Init); err != nil {
return err
}
func (app *Application) EvalString(code string) error {
defs, err := definition.Check(code, app.config)
if err != nil {
return err
}
return nil

return app.Eval(defs)
}

//func (app *Application) StartJobs() error {
// log.Info("Starting jobs...")
// for name, def := range app.definitions.Jobs {
// if _, err := app.InvokeFunction(string(name), def.Config.Init); err != nil {
// return err
// }
// }
// return nil
//}

// reset but ready to start again
func (app *Application) reset() {
app.sandbox.Flush()
Expand All @@ -173,6 +175,9 @@ func (app *Application) Close() error {
if err := app.eventsSubscription.Unsubscribe(); err != nil {
return err
}
if err := app.startWorkerSubscription.Unsubscribe(); err != nil {
return err
}
app.reset()
return app.dataStore.Close()
}
Expand All @@ -184,3 +189,11 @@ func (app *Application) Definitions() *definition.Definitions {
func (app *Application) EventBus() *cluster.ClusterEventBus {
return app.eventBus
}

func (app *Application) Sandbox() *sandbox.Sandbox {
return app.sandbox
}

func (app *Application) Name() string {
return app.appName
}
9 changes: 4 additions & 5 deletions pkg/application/application_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ func (ag *APIGateway) exposeApplicationAPI() {
if !ag.authApp(w, r, app) {
return
}
if err := app.Eval(app.code); err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Error: %s", err)
return
}

panic("TO IMPLEMENT")
// TODO Implement again with cluster event

fmt.Fprint(w, `{"status": "ok"}`)
}).Methods("POST")
}
Loading

0 comments on commit 7dcfe7a

Please sign in to comment.