Skip to content

Commit

Permalink
Changed apps to use milliseconds instead of seconds for timestamps to…
Browse files Browse the repository at this point in the history
… be more accurate
  • Loading branch information
frikky committed Nov 30, 2023
1 parent ff7ca90 commit 706a8ee
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 13 deletions.
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ SHUFFLE_SWARM_BRIDGE_DEFAULT_MTU=1500 # 1500 by default
# Used for auto-cleanup of containers. REALLY important at scale. Set to false to see all container info.
SHUFFLE_MEMCACHED=
SHUFFLE_CONTAINER_AUTO_CLEANUP=true
SHUFFLE_ORBORUS_EXECUTION_CONCURRENCY=7 # The amount of concurrent executions Orborus can handle. This is a soft limit, but it's recommended to keep it low.
SHUFFLE_ORBORUS_EXECUTION_CONCURRENCY=5 # The amount of concurrent executions Orborus can handle. This is a soft limit, but it's recommended to keep it low.
SHUFFLE_HEALTHCHECK_DISABLED=false
SHUFFLE_ELASTIC=true
SHUFFLE_LOGS_DISABLED=false
Expand Down
20 changes: 11 additions & 9 deletions backend/app_sdk/app_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,17 +302,19 @@ def __init__(self, redis=None, logger=None, console_logger=None):#, docker_clien
self.authorization = os.getenv("AUTHORIZATION", "")
self.current_execution_id = os.getenv("EXECUTIONID", "")
self.full_execution = os.getenv("FULL_EXECUTION", "")
self.start_time = int(time.time())
self.result_wrapper_count = 0

# Make start time with milliseconds
self.start_time = int(time.time_ns())

self.action_result = {
"action": self.action,
"authorization": self.authorization,
"execution_id": self.current_execution_id,
"result": f"",
"started_at": self.start_time,
"status": "",
"completed_at": int(time.time()),
"completed_at": int(time.time_ns()),
}

if isinstance(self.action, str):
Expand Down Expand Up @@ -468,7 +470,7 @@ def send_result(self, action_result, headers, stream_path):

# Try it with some magic

action_result["completed_at"] = int(time.time())
action_result["completed_at"] = int(time.time_ns())
self.logger.info(f"""[DEBUG] Inside Send result with status {action_result["status"]}""")
#if isinstance(action_result,

Expand Down Expand Up @@ -1011,7 +1013,7 @@ def run_recursed_items(self, func, baseparams, loop_wrapper):
"result": f"All {len(param_multiplier)} values were non-unique",
"started_at": self.start_time,
"status": "SKIPPED",
"completed_at": int(time.time()),
"completed_at": int(time.time_ns()),
}

self.send_result(self.action_result, {"Content-Type": "application/json", "Authorization": "Bearer %s" % self.authorization}, "/api/v1/streams")
Expand Down Expand Up @@ -1457,7 +1459,7 @@ def execute_action(self, action):
"authorization": self.authorization,
"execution_id": self.current_execution_id,
"result": "",
"started_at": int(time.time()),
"started_at": int(time.time_ns()),
"status": "EXECUTING"
}

Expand Down Expand Up @@ -2469,7 +2471,7 @@ def parse_liquid(template, self):
self.action_result["result"] = f"Failed to parse LiquidPy: {error_msg}"
print("[WARNING] Failed to set LiquidPy result")

self.action_result["completed_at"] = int(time.time())
self.action_result["completed_at"] = int(time.time_ns())
self.send_result(self.action_result, headers, stream_path)

self.logger.info(f"[ERROR] Sent FAILURE response to backend due to : {e}")
Expand Down Expand Up @@ -3071,7 +3073,7 @@ def check_branch_conditions(action, fullexecution, self):
self.logger.info("Failed one or more branch conditions.")
self.action_result["result"] = tmpresult
self.action_result["status"] = "SKIPPED"
self.action_result["completed_at"] = int(time.time())
self.action_result["completed_at"] = int(time.time_ns())

self.send_result(self.action_result, headers, stream_path)
return
Expand Down Expand Up @@ -3557,7 +3559,7 @@ def check_branch_conditions(action, fullexecution, self):
self.logger.info("[WARNING] SHOULD STOP EXECUTION BECAUSE FIELDS AREN'T UNIQUE")
self.action_result["status"] = "SKIPPED"
self.action_result["result"] = f"A non-unique value was found"
self.action_result["completed_at"] = int(time.time())
self.action_result["completed_at"] = int(time.time_ns())
self.send_result(self.action_result, headers, stream_path)
return

Expand Down Expand Up @@ -3885,7 +3887,7 @@ async def parse_value(newres):
})

# Send the result :)
self.action_result["completed_at"] = int(time.time())
self.action_result["completed_at"] = int(time.time_ns())
self.send_result(self.action_result, headers, stream_path)

#try:
Expand Down
8 changes: 6 additions & 2 deletions backend/go-app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3706,6 +3706,10 @@ func runInitEs(ctx context.Context) {
continue
}

// FIXME: Add a randomized timer to avoid all schedules running at the same time
// Many are at 5 minutes / 1 hour. The point is to spread these out
// a bit instead of all of them starting at the exact same time

//log.Printf("Schedule: %#v", schedule)
//log.Printf("Schedule time: every %d seconds", schedule.Seconds)
jobret, err := newscheduler.Every(schedule.Seconds).Seconds().NotImmediately().Run(job(schedule))
Expand Down Expand Up @@ -3984,7 +3988,7 @@ func runInitEs(ctx context.Context) {

r, err := git.Clone(storer, fs, cloneOptions)
if err != nil {
log.Printf("[WARNING] Failed loading repo into memory (init): %s", err)
log.Printf("[ERROR] Failed loading repo into memory (init): %s", err)
}

dir, err := fs.ReadDir("")
Expand Down Expand Up @@ -4021,7 +4025,7 @@ func runInitEs(ctx context.Context) {
}
_, err = git.Clone(storer, fs, cloneOptions)
if err != nil {
log.Printf("[WARNING] Failed loading repo %s into memory: %s", apis, err)
log.Printf("[ERROR] Failed loading repo %s into memory: %s", apis, err)
} else if err == nil && len(workflowapps) < 10 {
log.Printf("[INFO] Finished git clone. Looking for updates to the repo.")
dir, err := fs.ReadDir("")
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ services:
- /var/run/docker.sock:/var/run/docker.sock
environment:
- SHUFFLE_APP_SDK_TIMEOUT=300
- SHUFFLE_ORBORUS_EXECUTION_CONCURRENCY=5 # The amount of concurrent executions Orborus can handle.
#- DOCKER_HOST=tcp://docker-socket-proxy:2375
- ENVIRONMENT_NAME=${ENVIRONMENT_NAME}
- BASE_URL=http://${OUTER_HOSTNAME}:5001
Expand Down
2 changes: 1 addition & 1 deletion functions/onprem/orborus/orborus.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ var sleepTime = 2

// Making it work on low-end machines even during busy times :)
// May cause some things to run slowly
var maxConcurrency = 3
var maxConcurrency = 5

// Timeout if something rashes
var workerTimeoutEnv = os.Getenv("SHUFFLE_ORBORUS_EXECUTION_TIMEOUT")
Expand Down

0 comments on commit 706a8ee

Please sign in to comment.