Skip to content

Commit

Permalink
Merge pull request #447 from leaf-ai/ASD-venv-mode-only01
Browse files Browse the repository at this point in the history
Add a mode to only generate workload Python virtual environment.
  • Loading branch information
andreidenissov-cog authored Jan 3, 2025
2 parents 40a386a + 332fbef commit c1327dc
Show file tree
Hide file tree
Showing 34 changed files with 407 additions and 2,765 deletions.
1 change: 1 addition & 0 deletions Dockerfile.2stage
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ RUN curl https://pyenv.run | /bin/bash && \
eval "$(pyenv virtualenv-init -)" && \
pyenv install --list | grep " 3\.[56789]" && \
pyenv install 3.10.11 && \
pyenv install 3.12.8 && \
pyenv global 3.10.11

RUN \
Expand Down
2 changes: 1 addition & 1 deletion cmd/runner/filequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func serviceFileQueue(ctx context.Context, checkInterval time.Duration) {
}

matcher, mismatcher := initFileQueueParams()
fqProject := runner.NewLocalQueue(*localQueueRootOpt, nil, logger)
fqProject := runner.NewLocalQueue(*localQueueRootOpt, logger)

// Tracks all known queues and their cancel functions so they can have any
// running jobs terminated should they disappear
Expand Down
34 changes: 13 additions & 21 deletions cmd/runner/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"flag"
"fmt"
"github.com/leaf-ai/studio-go-runner/internal/request"
"sync/atomic"
"time"

Expand All @@ -15,7 +16,6 @@ import (

"github.com/leaf-ai/go-service/pkg/network"
"github.com/leaf-ai/go-service/pkg/server"
"github.com/leaf-ai/studio-go-runner/internal/request"
"github.com/leaf-ai/studio-go-runner/internal/task"
)

Expand Down Expand Up @@ -43,9 +43,10 @@ func HandleMsg(ctx context.Context, qt *task.QueueTask) (rsc *server.Resource, c

// allocate the processor and use the subscription name as the group by for work coming down the
// pipe that is sent to the resource allocation module
proc, hardError, err := newProcessor(ctx, qt, accessionID)
proc, hardError, err := GetNewProcessor(ctx, qt, accessionID)
if proc != nil {
rsc = proc.Request.Experiment.Resource.Clone()
task_req := proc.GetRequest()
rsc = task_req.Experiment.Resource.Clone()
if rsc == nil {
logger.Warn("resource spec empty", "subscription", qt.Subscription, "stack", stack.Trace().TrimRuntime())
}
Expand All @@ -59,7 +60,8 @@ func HandleMsg(ctx context.Context, qt *task.QueueTask) (rsc *server.Resource, c
// Check for the presence of artifact credentials and if we see none, then for backward
// compatibility, see if there are AWS credentials in the env variables and if so load these
// into the artifacts
for key, art := range proc.Request.Experiment.Artifacts {
task_req := proc.GetRequest()
for key, art := range task_req.Experiment.Artifacts {
if art.Credentials.Plain != nil {
continue
}
Expand All @@ -70,16 +72,16 @@ func HandleMsg(ctx context.Context, qt *task.QueueTask) (rsc *server.Resource, c
continue
}
if *allowEnvSecrets {
if accessKey, isPresent := proc.Request.Config.Env["AWS_ACCESS_KEY_ID"]; isPresent {
secretKey := proc.Request.Config.Env["AWS_SECRET_ACCESS_KEY"]
if accessKey, isPresent := task_req.Config.Env["AWS_ACCESS_KEY_ID"]; isPresent {
secretKey := task_req.Config.Env["AWS_SECRET_ACCESS_KEY"]
newArt := art.Clone()
newArt.Credentials = request.Credentials{
AWS: &request.AWSCredential{
AccessKey: accessKey,
SecretKey: secretKey,
},
}
proc.Request.Experiment.Artifacts[key] = *newArt
task_req.Experiment.Artifacts[key] = *newArt
}
}
}
Expand All @@ -99,25 +101,15 @@ func HandleMsg(ctx context.Context, qt *task.QueueTask) (rsc *server.Resource, c
atomic.AddInt32(&queueRan, 1)

logger.Debug("experiment completed", "duration", time.Since(startTime).String(),
"experiment_id", proc.Request.Experiment.Key,
"project_id", proc.Request.Config.Database.ProjectId, "root_dir", proc.RootDir,
"experiment_id", task_req.Experiment.Key,
"project_id", task_req.Config.Database.ProjectId, "root_dir", proc.GetRootDir(),
"subscription", qt.Subscription)
}()

logger.Debug("experiment started", "experiment_id", proc.Request.Experiment.Key,
"project_id", proc.Request.Config.Database.ProjectId, "root_dir", proc.RootDir,
logger.Debug("experiment started", "experiment_id", task_req.Experiment.Key,
"project_id", task_req.Config.Database.ProjectId, "root_dir", proc.GetRootDir(),
"subscription", qt.Subscription)

if qt.ResponseQ != nil {
select {
case qt.ResponseQ <- "":
default:
// If this queue backs up dont response to failures
// as back preassure is a sign on something very wrong
// that we cannot correct
}
}

// Blocking call to run the entire task and only return on termination due to the context
// being canceled or its own error / success
ack, err := proc.Process(ctx)
Expand Down
10 changes: 0 additions & 10 deletions cmd/runner/k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"context"
"github.com/leaf-ai/go-service/pkg/server"
"github.com/leaf-ai/go-service/pkg/types"
"github.com/leaf-ai/studio-go-runner/internal/defense"
"net/http"
"os"
"testing"
Expand Down Expand Up @@ -82,15 +81,6 @@ func Test0InitK8s(t *testing.T) {
if err := server.IsAliveK8s(); err != nil {
t.Fatal(err)
}
w, err := defense.KubernetesWrapper(*msgEncryptDirOpt)
if err != nil {
t.Fatal(err)
}

// If kubernetes is present there MUST be secrets loaded to run message encryption
if w == nil {
t.Fatal(kv.NewError("wrapper missing").With("stack", stack.Trace().TrimRuntime()))
}
}

// TestK8sConfigNode is used to test that both the global and the node specific config
Expand Down
127 changes: 0 additions & 127 deletions cmd/runner/limiter.go

This file was deleted.

51 changes: 0 additions & 51 deletions cmd/runner/limiter_test.go

This file was deleted.

Loading

0 comments on commit c1327dc

Please sign in to comment.