From 706a8ee233b80a4f53ed5ccf25310c615c0e8e2a Mon Sep 17 00:00:00 2001 From: Frikky Date: Thu, 30 Nov 2023 21:11:16 +0100 Subject: [PATCH] Changed apps to use milliseconds instead of seconds for timestamps to be more accurate --- .env | 2 +- backend/app_sdk/app_base.py | 20 +++++++++++--------- backend/go-app/main.go | 8 ++++++-- docker-compose.yml | 1 + functions/onprem/orborus/orborus.go | 2 +- 5 files changed, 20 insertions(+), 13 deletions(-) diff --git a/.env b/.env index f2e20b1fc..73fbdf636 100755 --- a/.env +++ b/.env @@ -70,7 +70,7 @@ SHUFFLE_SWARM_BRIDGE_DEFAULT_MTU=1500 # 1500 by default # Used for auto-cleanup of containers. REALLY important at scale. Set to false to see all container info. SHUFFLE_MEMCACHED= SHUFFLE_CONTAINER_AUTO_CLEANUP=true -SHUFFLE_ORBORUS_EXECUTION_CONCURRENCY=7 # The amount of concurrent executions Orborus can handle. This is a soft limit, but it's recommended to keep it low. +SHUFFLE_ORBORUS_EXECUTION_CONCURRENCY=5 # The amount of concurrent executions Orborus can handle. This is a soft limit, but it's recommended to keep it low. SHUFFLE_HEALTHCHECK_DISABLED=false SHUFFLE_ELASTIC=true SHUFFLE_LOGS_DISABLED=false diff --git a/backend/app_sdk/app_base.py b/backend/app_sdk/app_base.py index 156b2239d..753849beb 100755 --- a/backend/app_sdk/app_base.py +++ b/backend/app_sdk/app_base.py @@ -302,9 +302,11 @@ def __init__(self, redis=None, logger=None, console_logger=None):#, docker_clien self.authorization = os.getenv("AUTHORIZATION", "") self.current_execution_id = os.getenv("EXECUTIONID", "") self.full_execution = os.getenv("FULL_EXECUTION", "") - self.start_time = int(time.time()) self.result_wrapper_count = 0 + # Make start time with milliseconds + self.start_time = int(time.time_ns()) + self.action_result = { "action": self.action, "authorization": self.authorization, @@ -312,7 +314,7 @@ def __init__(self, redis=None, logger=None, console_logger=None):#, docker_clien "result": f"", "started_at": self.start_time, "status": "", - "completed_at": int(time.time()), + "completed_at": int(time.time_ns()), } if isinstance(self.action, str): @@ -468,7 +470,7 @@ def send_result(self, action_result, headers, stream_path): # Try it with some magic - action_result["completed_at"] = int(time.time()) + action_result["completed_at"] = int(time.time_ns()) self.logger.info(f"""[DEBUG] Inside Send result with status {action_result["status"]}""") #if isinstance(action_result, @@ -1011,7 +1013,7 @@ def run_recursed_items(self, func, baseparams, loop_wrapper): "result": f"All {len(param_multiplier)} values were non-unique", "started_at": self.start_time, "status": "SKIPPED", - "completed_at": int(time.time()), + "completed_at": int(time.time_ns()), } self.send_result(self.action_result, {"Content-Type": "application/json", "Authorization": "Bearer %s" % self.authorization}, "/api/v1/streams") @@ -1457,7 +1459,7 @@ def execute_action(self, action): "authorization": self.authorization, "execution_id": self.current_execution_id, "result": "", - "started_at": int(time.time()), + "started_at": int(time.time_ns()), "status": "EXECUTING" } @@ -2469,7 +2471,7 @@ def parse_liquid(template, self): self.action_result["result"] = f"Failed to parse LiquidPy: {error_msg}" print("[WARNING] Failed to set LiquidPy result") - self.action_result["completed_at"] = int(time.time()) + self.action_result["completed_at"] = int(time.time_ns()) self.send_result(self.action_result, headers, stream_path) self.logger.info(f"[ERROR] Sent FAILURE response to backend due to : {e}") @@ -3071,7 +3073,7 @@ def check_branch_conditions(action, fullexecution, self): self.logger.info("Failed one or more branch conditions.") self.action_result["result"] = tmpresult self.action_result["status"] = "SKIPPED" - self.action_result["completed_at"] = int(time.time()) + self.action_result["completed_at"] = int(time.time_ns()) self.send_result(self.action_result, headers, stream_path) return @@ -3557,7 +3559,7 @@ def check_branch_conditions(action, fullexecution, self): self.logger.info("[WARNING] SHOULD STOP EXECUTION BECAUSE FIELDS AREN'T UNIQUE") self.action_result["status"] = "SKIPPED" self.action_result["result"] = f"A non-unique value was found" - self.action_result["completed_at"] = int(time.time()) + self.action_result["completed_at"] = int(time.time_ns()) self.send_result(self.action_result, headers, stream_path) return @@ -3885,7 +3887,7 @@ async def parse_value(newres): }) # Send the result :) - self.action_result["completed_at"] = int(time.time()) + self.action_result["completed_at"] = int(time.time_ns()) self.send_result(self.action_result, headers, stream_path) #try: diff --git a/backend/go-app/main.go b/backend/go-app/main.go index 7f561852b..9717d6d3d 100755 --- a/backend/go-app/main.go +++ b/backend/go-app/main.go @@ -3706,6 +3706,10 @@ func runInitEs(ctx context.Context) { continue } + // FIXME: Add a randomized timer to avoid all schedules running at the same time + // Many are at 5 minutes / 1 hour. The point is to spread these out + // a bit instead of all of them starting at the exact same time + //log.Printf("Schedule: %#v", schedule) //log.Printf("Schedule time: every %d seconds", schedule.Seconds) jobret, err := newscheduler.Every(schedule.Seconds).Seconds().NotImmediately().Run(job(schedule)) @@ -3984,7 +3988,7 @@ func runInitEs(ctx context.Context) { r, err := git.Clone(storer, fs, cloneOptions) if err != nil { - log.Printf("[WARNING] Failed loading repo into memory (init): %s", err) + log.Printf("[ERROR] Failed loading repo into memory (init): %s", err) } dir, err := fs.ReadDir("") @@ -4021,7 +4025,7 @@ func runInitEs(ctx context.Context) { } _, err = git.Clone(storer, fs, cloneOptions) if err != nil { - log.Printf("[WARNING] Failed loading repo %s into memory: %s", apis, err) + log.Printf("[ERROR] Failed loading repo %s into memory: %s", apis, err) } else if err == nil && len(workflowapps) < 10 { log.Printf("[INFO] Finished git clone. Looking for updates to the repo.") dir, err := fs.ReadDir("") diff --git a/docker-compose.yml b/docker-compose.yml index ba3f43535..3ca6d2f04 100755 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -43,6 +43,7 @@ services: - /var/run/docker.sock:/var/run/docker.sock environment: - SHUFFLE_APP_SDK_TIMEOUT=300 + - SHUFFLE_ORBORUS_EXECUTION_CONCURRENCY=5 # The amount of concurrent executions Orborus can handle. #- DOCKER_HOST=tcp://docker-socket-proxy:2375 - ENVIRONMENT_NAME=${ENVIRONMENT_NAME} - BASE_URL=http://${OUTER_HOSTNAME}:5001 diff --git a/functions/onprem/orborus/orborus.go b/functions/onprem/orborus/orborus.go index 36523b4f4..b47d11e99 100755 --- a/functions/onprem/orborus/orborus.go +++ b/functions/onprem/orborus/orborus.go @@ -63,7 +63,7 @@ var sleepTime = 2 // Making it work on low-end machines even during busy times :) // May cause some things to run slowly -var maxConcurrency = 3 +var maxConcurrency = 5 // Timeout if something rashes var workerTimeoutEnv = os.Getenv("SHUFFLE_ORBORUS_EXECUTION_TIMEOUT")