From b7638b038ced93fe061b9e44191b50b14dfa1c39 Mon Sep 17 00:00:00 2001 From: mark-vaykhansky Date: Sun, 18 May 2025 17:42:00 +0300 Subject: [PATCH 01/23] wip // max error rate in scheduler --- src/guidellm/scheduler/result.py | 2 ++ src/guidellm/scheduler/scheduler.py | 23 +++++++++++++++++++++-- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/src/guidellm/scheduler/result.py b/src/guidellm/scheduler/result.py index 0f12687f..9d379422 100644 --- a/src/guidellm/scheduler/result.py +++ b/src/guidellm/scheduler/result.py @@ -46,12 +46,14 @@ class SchedulerRunInfo(StandardBaseModel): end_number: float processes: int strategy: SchedulingStrategy + max_error_rate: float created_requests: int = 0 queued_requests: int = 0 scheduled_requests: int = 0 processing_requests: int = 0 completed_requests: int = 0 + errored_requests: int = 0 class SchedulerRequestInfo(StandardBaseModel): diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index 06203827..33204729 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -64,12 +64,14 @@ def __init__( self.worker = worker self.request_loader = request_loader + self.error_rate: Optional[float] = None async def run( self, scheduling_strategy: SchedulingStrategy, max_number: Optional[int] = None, max_duration: Optional[float] = None, + max_error_rate: Optional[float] = 0.05, ) -> AsyncGenerator[ Union[SchedulerResult, SchedulerRequestResult[RequestT, ResponseT]], None ]: @@ -98,6 +100,8 @@ async def run( :param max_duration: The maximum duration for the scheduling run. If None, then no limit is set and either the iterator must be exhaustible or the max_number must be set. + :param max_error_rate: The maximum error rate after which the scheduler shuts down. + If not provided a default of 5% i.e 0.05 is used. :return: An asynchronous generator that yields SchedulerResult objects. Each SchedulerResult object contains information about the request, the response, and the run information. @@ -109,9 +113,12 @@ async def run( if max_number is not None and max_number < 1: raise ValueError(f"Invalid max_number: {max_number}") - if max_duration is not None and max_duration < 0: raise ValueError(f"Invalid max_duration: {max_duration}") + if max_error_rate is not None and (max_error_rate < 0 or max_error_rate > 1): + raise ValueError(f"Invalid max_error_rate: {max_error_rate}") + + shutdown_event = multiprocessing.Event() with ( multiprocessing.Manager() as manager, @@ -124,7 +131,7 @@ async def run( manager, executor, scheduling_strategy ) run_info, requests_iter, times_iter = self._run_setup( - futures, scheduling_strategy, max_number, max_duration + futures, scheduling_strategy, max_number, max_duration, max_error_rate ) yield SchedulerResult( type_="run_start", @@ -159,6 +166,8 @@ async def run( run_info, ) if iter_result is not None: + if self._is_max_error_rate_reached(iter_result.run_info): + logger.info(f"Max_error rate of ({iter_result.run_info.max_error_rate}) reached!") yield iter_result # yield control to the event loop @@ -249,6 +258,7 @@ def _run_setup( scheduling_strategy: SchedulingStrategy, max_number: Optional[int], max_duration: Optional[float], + max_error_rate: Optional[float], ) -> tuple[SchedulerRunInfo, Iterator[Any], Iterator[float]]: requests_iter = iter(self.request_loader) start_time = time.time() @@ -276,6 +286,7 @@ def _run_setup( end_number=end_number, processes=len(processes), strategy=scheduling_strategy, + max_error_rate=max_error_rate ) return info, requests_iter, times_iter @@ -362,6 +373,9 @@ def _check_result_ready( run_info.processing_requests -= 1 run_info.completed_requests += 1 + if process_response.info.errored: + run_info.errored_requests += 1 + return SchedulerRequestResult( type_="request_complete", run_info=run_info, @@ -371,6 +385,11 @@ def _check_result_ready( ) raise ValueError(f"Invalid process response type: {process_response}") + @staticmethod + def _is_max_error_rate_reached(run_info: SchedulerRunInfo) -> bool: + current_error_rate = run_info.errored_requests / run_info.end_number + return current_error_rate > run_info.max_error_rate + async def _stop_processes( self, futures: list[asyncio.Future], From 6059af183ebed636af5a2a7eed4707d943f8e7db Mon Sep 17 00:00:00 2001 From: mark-vaykhansky Date: Mon, 19 May 2025 11:41:33 +0300 Subject: [PATCH 02/23] wip --- src/guidellm/scheduler/scheduler.py | 18 ++++++++++-------- src/guidellm/scheduler/worker.py | 9 +++++++-- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index 33204729..cd9231af 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -64,7 +64,6 @@ def __init__( self.worker = worker self.request_loader = request_loader - self.error_rate: Optional[float] = None async def run( self, @@ -118,8 +117,6 @@ async def run( if max_error_rate is not None and (max_error_rate < 0 or max_error_rate > 1): raise ValueError(f"Invalid max_error_rate: {max_error_rate}") - shutdown_event = multiprocessing.Event() - with ( multiprocessing.Manager() as manager, ProcessPoolExecutor( @@ -127,7 +124,7 @@ async def run( ) as executor, ): requests_iter: Optional[Iterator[Any]] = None - futures, requests_queue, responses_queue = await self._start_processes( + futures, requests_queue, responses_queue, shutdown_event = await self._start_processes( manager, executor, scheduling_strategy ) run_info, requests_iter, times_iter = self._run_setup( @@ -167,7 +164,9 @@ async def run( ) if iter_result is not None: if self._is_max_error_rate_reached(iter_result.run_info): - logger.info(f"Max_error rate of ({iter_result.run_info.max_error_rate}) reached!") + logger.info(f"Max_error rate of ({iter_result.run_info.max_error_rate}) reached, sending " + f"shutdown signal") + shutdown_event.set() yield iter_result # yield control to the event loop @@ -191,8 +190,10 @@ async def _start_processes( list[asyncio.Future], multiprocessing.Queue, multiprocessing.Queue, + multiprocessing.Event ]: await self.worker.prepare_multiprocessing() + shutdown_event = multiprocessing.Event() requests_queue = manager.Queue( maxsize=scheduling_strategy.queued_requests_limit ) @@ -229,6 +230,7 @@ async def _start_processes( requests_queue, responses_queue, id_, + shutdown_event, ) ) elif scheduling_strategy.processing_mode == "async": @@ -240,6 +242,7 @@ async def _start_processes( responses_queue, requests_limit, id_, + shutdown_event, ) ) else: @@ -250,7 +253,7 @@ async def _start_processes( await asyncio.sleep(0.1) # give time for processes to start - return futures, requests_queue, responses_queue + return futures, requests_queue, responses_queue, shutdown_event def _run_setup( self, @@ -385,8 +388,7 @@ def _check_result_ready( ) raise ValueError(f"Invalid process response type: {process_response}") - @staticmethod - def _is_max_error_rate_reached(run_info: SchedulerRunInfo) -> bool: + def _is_max_error_rate_reached(self, run_info: SchedulerRunInfo) -> bool: current_error_rate = run_info.errored_requests / run_info.end_number return current_error_rate > run_info.max_error_rate diff --git a/src/guidellm/scheduler/worker.py b/src/guidellm/scheduler/worker.py index a53b14c2..2dfd4462 100644 --- a/src/guidellm/scheduler/worker.py +++ b/src/guidellm/scheduler/worker.py @@ -121,9 +121,13 @@ async def resolve( ... async def get_request( - self, requests_queue: multiprocessing.Queue + self, requests_queue: multiprocessing.Queue, shutdown_event: multiprocessing.Event, shutdonen_check_ ) -> Optional[WorkerProcessRequest[RequestT]]: - return await asyncio.to_thread(requests_queue.get) # type: ignore[attr-defined] + def _get_queue_intermittently(request_queue: multiprocessing.Queue, shutdown_event): + try: + + + return await asyncio.to_thread(_get_queue_intermittently()) # type: ignore[attr-defined] async def send_result( self, @@ -222,6 +226,7 @@ def process_loop_asynchronous( results_queue: multiprocessing.Queue, max_concurrency: int, process_id: int, + shutdown_event: multiprocessing.Event, ): async def _process_runner(): pending = asyncio.Semaphore(max_concurrency) From 69a5c9eb5b5a272a0821dfdf569ed0acf5bcaffe Mon Sep 17 00:00:00 2001 From: mark-vaykhansky Date: Mon, 19 May 2025 13:51:05 +0300 Subject: [PATCH 03/23] Revert "wip" This reverts commit 6059af183ebed636af5a2a7eed4707d943f8e7db. --- src/guidellm/scheduler/scheduler.py | 18 ++++++++---------- src/guidellm/scheduler/worker.py | 9 ++------- 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index cd9231af..33204729 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -64,6 +64,7 @@ def __init__( self.worker = worker self.request_loader = request_loader + self.error_rate: Optional[float] = None async def run( self, @@ -117,6 +118,8 @@ async def run( if max_error_rate is not None and (max_error_rate < 0 or max_error_rate > 1): raise ValueError(f"Invalid max_error_rate: {max_error_rate}") + shutdown_event = multiprocessing.Event() + with ( multiprocessing.Manager() as manager, ProcessPoolExecutor( @@ -124,7 +127,7 @@ async def run( ) as executor, ): requests_iter: Optional[Iterator[Any]] = None - futures, requests_queue, responses_queue, shutdown_event = await self._start_processes( + futures, requests_queue, responses_queue = await self._start_processes( manager, executor, scheduling_strategy ) run_info, requests_iter, times_iter = self._run_setup( @@ -164,9 +167,7 @@ async def run( ) if iter_result is not None: if self._is_max_error_rate_reached(iter_result.run_info): - logger.info(f"Max_error rate of ({iter_result.run_info.max_error_rate}) reached, sending " - f"shutdown signal") - shutdown_event.set() + logger.info(f"Max_error rate of ({iter_result.run_info.max_error_rate}) reached!") yield iter_result # yield control to the event loop @@ -190,10 +191,8 @@ async def _start_processes( list[asyncio.Future], multiprocessing.Queue, multiprocessing.Queue, - multiprocessing.Event ]: await self.worker.prepare_multiprocessing() - shutdown_event = multiprocessing.Event() requests_queue = manager.Queue( maxsize=scheduling_strategy.queued_requests_limit ) @@ -230,7 +229,6 @@ async def _start_processes( requests_queue, responses_queue, id_, - shutdown_event, ) ) elif scheduling_strategy.processing_mode == "async": @@ -242,7 +240,6 @@ async def _start_processes( responses_queue, requests_limit, id_, - shutdown_event, ) ) else: @@ -253,7 +250,7 @@ async def _start_processes( await asyncio.sleep(0.1) # give time for processes to start - return futures, requests_queue, responses_queue, shutdown_event + return futures, requests_queue, responses_queue def _run_setup( self, @@ -388,7 +385,8 @@ def _check_result_ready( ) raise ValueError(f"Invalid process response type: {process_response}") - def _is_max_error_rate_reached(self, run_info: SchedulerRunInfo) -> bool: + @staticmethod + def _is_max_error_rate_reached(run_info: SchedulerRunInfo) -> bool: current_error_rate = run_info.errored_requests / run_info.end_number return current_error_rate > run_info.max_error_rate diff --git a/src/guidellm/scheduler/worker.py b/src/guidellm/scheduler/worker.py index 2dfd4462..a53b14c2 100644 --- a/src/guidellm/scheduler/worker.py +++ b/src/guidellm/scheduler/worker.py @@ -121,13 +121,9 @@ async def resolve( ... async def get_request( - self, requests_queue: multiprocessing.Queue, shutdown_event: multiprocessing.Event, shutdonen_check_ + self, requests_queue: multiprocessing.Queue ) -> Optional[WorkerProcessRequest[RequestT]]: - def _get_queue_intermittently(request_queue: multiprocessing.Queue, shutdown_event): - try: - - - return await asyncio.to_thread(_get_queue_intermittently()) # type: ignore[attr-defined] + return await asyncio.to_thread(requests_queue.get) # type: ignore[attr-defined] async def send_result( self, @@ -226,7 +222,6 @@ def process_loop_asynchronous( results_queue: multiprocessing.Queue, max_concurrency: int, process_id: int, - shutdown_event: multiprocessing.Event, ): async def _process_runner(): pending = asyncio.Semaphore(max_concurrency) From 7795d2c23b0e3449506764102343eafbe486c5a6 Mon Sep 17 00:00:00 2001 From: mark-vaykhansky Date: Mon, 19 May 2025 14:28:32 +0300 Subject: [PATCH 04/23] Handle infinite datasets with constant rate --- src/guidellm/request/loader.py | 8 +++++++- src/guidellm/scheduler/result.py | 2 +- src/guidellm/scheduler/scheduler.py | 17 ++++++++++++++--- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/src/guidellm/request/loader.py b/src/guidellm/request/loader.py index 50ab3cca..0e54fc45 100644 --- a/src/guidellm/request/loader.py +++ b/src/guidellm/request/loader.py @@ -21,9 +21,14 @@ "GenerativeRequestLoaderDescription", "RequestLoader", "RequestLoaderDescription", + "InfiniteDatasetError" ] +class InfiniteDatasetError(Exception): + pass + + class RequestLoaderDescription(StandardBaseModel): type_: Literal["request_loader"] = "request_loader" @@ -120,7 +125,8 @@ def __len__(self) -> int: if self.iter_type == "finite": return self.num_unique_items() - raise ValueError(f"Unable to determine length of dataset: {self.data}") + assert self.iter_type == "infinite" + raise InfiniteDatasetError(f"Dataset {self.data} is infinite and thus unable to determine length") @property def description(self) -> GenerativeRequestLoaderDescription: diff --git a/src/guidellm/scheduler/result.py b/src/guidellm/scheduler/result.py index 9d379422..a340932d 100644 --- a/src/guidellm/scheduler/result.py +++ b/src/guidellm/scheduler/result.py @@ -43,7 +43,7 @@ class SchedulerRunInfo(StandardBaseModel): start_time: float end_time: float - end_number: float + end_number: float # ToDo: Rename to max_requests & change to int (check all references before) processes: int strategy: SchedulingStrategy max_error_rate: float diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index 33204729..d0d06a4a 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -15,6 +15,7 @@ from loguru import logger from guidellm.config import settings +from guidellm.request.loader import InfiniteDatasetError from guidellm.scheduler.result import ( SchedulerRequestResult, SchedulerResult, @@ -166,8 +167,12 @@ async def run( run_info, ) if iter_result is not None: - if self._is_max_error_rate_reached(iter_result.run_info): - logger.info(f"Max_error rate of ({iter_result.run_info.max_error_rate}) reached!") + if iter_result.request_info.errored: + if self._is_max_error_rate_reached(iter_result.run_info): + logger.info(f"Max_error rate of ({iter_result.run_info.max_error_rate}) reached!") + else: + cur_error_rate = iter_result.run_info.errored_requests / iter_result.run_info.end_number + logger.debug(f"Current error rate {cur_error_rate}") yield iter_result # yield control to the event loop @@ -271,7 +276,13 @@ def _run_setup( iter_length = len(self.request_loader) # type: ignore[arg-type] if 0 < iter_length < end_number: end_number = iter_length - except Exception: # noqa: BLE001, S110 + except InfiniteDatasetError: # noqa: BLE001, S110 + if scheduling_strategy.type_ == "constant" and max_duration is not None: + end_number = scheduling_strategy.rate * max_duration + else: + # ToDo: Maybe add poison? + raise + except Exception: pass if end_number == math.inf and end_time is None: From 6d688f0bdbcb01b1735fd77971e2c82a28a38e32 Mon Sep 17 00:00:00 2001 From: mark-vaykhansky Date: Mon, 19 May 2025 11:41:33 +0300 Subject: [PATCH 05/23] minor bug fixes --- src/guidellm/benchmark/benchmark.py | 2 +- src/guidellm/scheduler/scheduler.py | 42 +++++++++++++++------- src/guidellm/scheduler/worker.py | 54 ++++++++++++++++++++++++----- 3 files changed, 76 insertions(+), 22 deletions(-) diff --git a/src/guidellm/benchmark/benchmark.py b/src/guidellm/benchmark/benchmark.py index 4e2e09a3..50d2f49c 100644 --- a/src/guidellm/benchmark/benchmark.py +++ b/src/guidellm/benchmark/benchmark.py @@ -701,7 +701,7 @@ def from_stats( *["incomplete"] * len(incomplete), # type: ignore[list-item] *["error"] * len(errored), # type: ignore[list-item] ] - start_time = min(req.start_time for req in total) + start_time = min(req.start_time for req in total) # ToDo: Fix if total is empty end_time = max(req.end_time for req in total) total_with_prompt, total_types_with_prompt = ( diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index d0d06a4a..c58ef363 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -72,7 +72,7 @@ async def run( scheduling_strategy: SchedulingStrategy, max_number: Optional[int] = None, max_duration: Optional[float] = None, - max_error_rate: Optional[float] = 0.05, + max_error_rate: Optional[float] = 0, ) -> AsyncGenerator[ Union[SchedulerResult, SchedulerRequestResult[RequestT, ResponseT]], None ]: @@ -119,8 +119,6 @@ async def run( if max_error_rate is not None and (max_error_rate < 0 or max_error_rate > 1): raise ValueError(f"Invalid max_error_rate: {max_error_rate}") - shutdown_event = multiprocessing.Event() - with ( multiprocessing.Manager() as manager, ProcessPoolExecutor( @@ -128,9 +126,11 @@ async def run( ) as executor, ): requests_iter: Optional[Iterator[Any]] = None - futures, requests_queue, responses_queue = await self._start_processes( - manager, executor, scheduling_strategy + futures, requests_queue, responses_queue, shutdown_event = await self._start_processes( + manager, executor, scheduling_strategy, max_error_rate is not None ) + if shutdown_event: + assert not shutdown_event.is_set() run_info, requests_iter, times_iter = self._run_setup( futures, scheduling_strategy, max_number, max_duration, max_error_rate ) @@ -169,10 +169,15 @@ async def run( if iter_result is not None: if iter_result.request_info.errored: if self._is_max_error_rate_reached(iter_result.run_info): - logger.info(f"Max_error rate of ({iter_result.run_info.max_error_rate}) reached!") - else: - cur_error_rate = iter_result.run_info.errored_requests / iter_result.run_info.end_number - logger.debug(f"Current error rate {cur_error_rate}") + logger.info(f"Max_error rate of ({iter_result.run_info.max_error_rate}) " + f"reached, sending shutdown signal") + shutdown_event.set() + break + # else: + # # ToDo: Delete this else clause + # cur_error_rate = iter_result.run_info.errored_requests / iter_result.run_info.end_number + # logger.info(f"Current error rate {cur_error_rate}") + yield iter_result # yield control to the event loop @@ -192,12 +197,15 @@ async def _start_processes( manager, executor: ProcessPoolExecutor, scheduling_strategy: SchedulingStrategy, + create_shutdown_event: bool = False ) -> tuple[ list[asyncio.Future], multiprocessing.Queue, multiprocessing.Queue, + Optional[multiprocessing.Event] ]: await self.worker.prepare_multiprocessing() + shutdown_event = manager.Event() if create_shutdown_event else None requests_queue = manager.Queue( maxsize=scheduling_strategy.queued_requests_limit ) @@ -207,6 +215,7 @@ async def _start_processes( scheduling_strategy.processes_limit, scheduling_strategy.processing_requests_limit, ) + num_processes = 1 requests_limit_split = ( scheduling_strategy.processing_requests_limit // scheduling_strategy.processes_limit @@ -234,6 +243,7 @@ async def _start_processes( requests_queue, responses_queue, id_, + shutdown_event, ) ) elif scheduling_strategy.processing_mode == "async": @@ -245,6 +255,7 @@ async def _start_processes( responses_queue, requests_limit, id_, + shutdown_event, ) ) else: @@ -255,7 +266,7 @@ async def _start_processes( await asyncio.sleep(0.1) # give time for processes to start - return futures, requests_queue, responses_queue + return futures, requests_queue, responses_queue, shutdown_event def _run_setup( self, @@ -278,13 +289,19 @@ def _run_setup( end_number = iter_length except InfiniteDatasetError: # noqa: BLE001, S110 if scheduling_strategy.type_ == "constant" and max_duration is not None: - end_number = scheduling_strategy.rate * max_duration + total_requests_in_max_duration = int(scheduling_strategy.rate * max_duration) + if total_requests_in_max_duration < end_number: + assert total_requests_in_max_duration > 0 + end_number = total_requests_in_max_duration else: - # ToDo: Maybe add poison? + # ToDo: Add poison raise except Exception: pass + if end_number == math.inf and max_error_rate is not None: + raise RuntimeError("Can't ensure max_error_rate since can't calculate total requests count") + if end_number == math.inf and end_time is None: logger.warning( "No end number or end time set, " @@ -409,4 +426,5 @@ async def _stop_processes( for _ in futures: requests_queue.put(None) + logger.debug("Waiting for futures to shut down") await asyncio.gather(*futures) diff --git a/src/guidellm/scheduler/worker.py b/src/guidellm/scheduler/worker.py index a53b14c2..4515fefa 100644 --- a/src/guidellm/scheduler/worker.py +++ b/src/guidellm/scheduler/worker.py @@ -2,10 +2,12 @@ import math import multiprocessing import multiprocessing.queues +import queue import time from abc import ABC, abstractmethod from collections.abc import AsyncGenerator from dataclasses import dataclass +from datetime import timedelta from typing import ( Any, Generic, @@ -121,9 +123,23 @@ async def resolve( ... async def get_request( - self, requests_queue: multiprocessing.Queue + self, requests_queue: multiprocessing.Queue, + shutdown_event: Optional[multiprocessing.Event] = None, + process_id: Optional[int] = None, ) -> Optional[WorkerProcessRequest[RequestT]]: - return await asyncio.to_thread(requests_queue.get) # type: ignore[attr-defined] + if shutdown_event is not None and process_id is None: + logger.warning("shutdown_event is not None and process_id is None which makes it hard to debug") + + def _get_queue_intermittently(): + assert shutdown_event is not None + while True: + try: + return requests_queue.get(timeout=timedelta(seconds=1).total_seconds()) + except queue.Empty: + if shutdown_event.is_set(): + logger.info(f"Shutdown signal received in future {process_id}") + return + return await asyncio.to_thread(_get_queue_intermittently if shutdown_event is not None else requests_queue.get) # type: ignore[attr-defined] async def send_result( self, @@ -149,25 +165,25 @@ async def resolve_scheduler_request( scheduled_time=time.time(), process_id=process_id, ) - result: WorkerProcessResult[RequestT, ResponseT] = WorkerProcessResult( + request_scheduled_result: WorkerProcessResult[RequestT, ResponseT] = WorkerProcessResult( type_="request_scheduled", request=request, response=None, info=info, ) - asyncio.create_task(self.send_result(results_queue, result)) + asyncio.create_task(self.send_result(results_queue, request_scheduled_result)) if (wait_time := start_time - time.time()) > 0: await asyncio.sleep(wait_time) info.worker_start = time.time() - result = WorkerProcessResult( + request_start_result = WorkerProcessResult( type_="request_start", request=request, response=None, info=info, ) - asyncio.create_task(self.send_result(results_queue, result)) + asyncio.create_task(self.send_result(results_queue, request_start_result)) status, response = await self.resolve(request, timeout_time) info.worker_end = time.time() @@ -190,11 +206,20 @@ def process_loop_synchronous( requests_queue: multiprocessing.Queue, results_queue: multiprocessing.Queue, process_id: int, + shutdown_event: Optional[multiprocessing.Event] = None, ): async def _process_runner(): while ( - process_request := await self.get_request(requests_queue) + process_request := await self.get_request( + requests_queue=requests_queue, + shutdown_event=shutdown_event, + process_id=process_id, + ) ) is not None: + if shutdown_event and shutdown_event.is_set(): + logger.info(f"Shutdown signal received in future {process_id}") + break + dequeued_time = time.time() await self.resolve_scheduler_request( @@ -222,6 +247,7 @@ def process_loop_asynchronous( results_queue: multiprocessing.Queue, max_concurrency: int, process_id: int, + shutdown_event: Optional[multiprocessing.Event] = None, ): async def _process_runner(): pending = asyncio.Semaphore(max_concurrency) @@ -230,7 +256,10 @@ async def _process_runner(): raise ValueError("Async worker called with max_concurrency < 1") while ( - process_request := await self.get_request(requests_queue) + process_request := await self.get_request( + requests_queue=requests_queue, + shutdown_event=shutdown_event, + process_id=process_id) ) is not None: dequeued_time = time.time() @@ -240,6 +269,9 @@ def _task_done(_: asyncio.Task): nonlocal pending pending.release() + if shutdown_event and shutdown_event.is_set(): + logger.info(f"Shutdown signal received in future {process_id}") + break task = asyncio.create_task( self.resolve_scheduler_request( request=process_request.request, @@ -314,12 +346,14 @@ def process_loop_synchronous( requests_queue: multiprocessing.Queue, results_queue: multiprocessing.Queue, process_id: int, + shutdown_event: Optional[multiprocessing.Event] = None ): asyncio.run(self.backend.validate()) super().process_loop_synchronous( requests_queue=requests_queue, results_queue=results_queue, process_id=process_id, + shutdown_event=shutdown_event, ) def process_loop_asynchronous( @@ -328,6 +362,7 @@ def process_loop_asynchronous( results_queue: multiprocessing.Queue, max_concurrency: int, process_id: int, + shutdown_event: Optional[multiprocessing.Event] = None ): asyncio.run(self.backend.validate()) super().process_loop_asynchronous( @@ -335,6 +370,7 @@ def process_loop_asynchronous( results_queue=results_queue, max_concurrency=max_concurrency, process_id=process_id, + shutdown_event=shutdown_event, ) async def resolve( @@ -375,7 +411,7 @@ async def resolve( request_func, request_kwargs = self._create_request_func_kwargs(request) async def _runner(): - # wrap function so we can enforce timeout and + # wrap function so that we can enforce timeout and # still return the latest state from the backend async for resp in request_func(**request_kwargs): # type: ignore[operator] nonlocal response From ede651aca1bc0fd0de65fc869bea09798f1902c2 Mon Sep 17 00:00:00 2001 From: mark-vaykhansky Date: Wed, 21 May 2025 13:21:19 +0300 Subject: [PATCH 06/23] bugfix / last request not yielded --- src/guidellm/scheduler/scheduler.py | 71 ++++++++++++++++------------- 1 file changed, 39 insertions(+), 32 deletions(-) diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index c58ef363..628a9ac7 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -72,7 +72,7 @@ async def run( scheduling_strategy: SchedulingStrategy, max_number: Optional[int] = None, max_duration: Optional[float] = None, - max_error_rate: Optional[float] = 0, + max_error_rate: Optional[float] = None, ) -> AsyncGenerator[ Union[SchedulerResult, SchedulerRequestResult[RequestT, ResponseT]], None ]: @@ -140,7 +140,8 @@ async def run( ) try: - while True: + max_error_rate_reached = False + while not max_error_rate_reached: # check errors and raise them for future in futures: if future.done() and (err := future.exception()) is not None: @@ -167,17 +168,13 @@ async def run( run_info, ) if iter_result is not None: - if iter_result.request_info.errored: - if self._is_max_error_rate_reached(iter_result.run_info): - logger.info(f"Max_error rate of ({iter_result.run_info.max_error_rate}) " - f"reached, sending shutdown signal") - shutdown_event.set() - break - # else: - # # ToDo: Delete this else clause - # cur_error_rate = iter_result.run_info.errored_requests / iter_result.run_info.end_number - # logger.info(f"Current error rate {cur_error_rate}") - + if iter_result.request_info.errored \ + and not iter_result.request_info.canceled \ + and self._is_max_error_rate_reached(iter_result.run_info): + shutdown_event.set() + max_error_rate_reached = True + logger.info(f"Max_error rate of ({iter_result.run_info.max_error_rate}) " + f"reached, sending shutdown signal") yield iter_result # yield control to the event loop @@ -280,27 +277,10 @@ def _run_setup( start_time = time.time() times_iter = iter(scheduling_strategy.request_times()) end_time = time.time() + (max_duration or math.inf) - end_number = max_number or math.inf - - try: - # update end number if the request loader is finite and less than max - iter_length = len(self.request_loader) # type: ignore[arg-type] - if 0 < iter_length < end_number: - end_number = iter_length - except InfiniteDatasetError: # noqa: BLE001, S110 - if scheduling_strategy.type_ == "constant" and max_duration is not None: - total_requests_in_max_duration = int(scheduling_strategy.rate * max_duration) - if total_requests_in_max_duration < end_number: - assert total_requests_in_max_duration > 0 - end_number = total_requests_in_max_duration - else: - # ToDo: Add poison - raise - except Exception: - pass + end_number = self._determine_total_requests_count(scheduling_strategy, max_duration, max_error_rate, max_number) if end_number == math.inf and max_error_rate is not None: - raise RuntimeError("Can't ensure max_error_rate since can't calculate total requests count") + logger.warning("max_error_rate will be ignored because end_number can not be determined.") if end_number == math.inf and end_time is None: logger.warning( @@ -319,6 +299,33 @@ def _run_setup( return info, requests_iter, times_iter + def _determine_total_requests_count( + self, + scheduling_strategy: SchedulingStrategy, + max_duration: Optional[float], + max_error_rate: Optional[float], + max_number: Optional[int], + ) -> int: + end_number = max_number or math.inf + try: + # update end number if the request loader is finite and less than max + iter_length = len(self.request_loader) # type: ignore[arg-type] + if 0 < iter_length < end_number: + end_number = iter_length + except InfiniteDatasetError: # noqa: BLE001, S110 + if scheduling_strategy.type_ == "constant" and max_duration is not None: + total_requests_in_max_duration = int(scheduling_strategy.rate * max_duration) + if total_requests_in_max_duration < end_number: + assert total_requests_in_max_duration > 0 + end_number = total_requests_in_max_duration + else: + if max_error_rate: + logger.warning() + raise + except Exception: + pass + return end_number + def _add_requests( self, requests_iter: Optional[Iterator[Any]], From a17117c7dc3d973fd328d6754083dc9471db01b1 Mon Sep 17 00:00:00 2001 From: mark-vaykhansky Date: Wed, 21 May 2025 13:21:47 +0300 Subject: [PATCH 07/23] Add max error rate to readme, CLI & report --- README.md | 2 ++ src/guidellm/__main__.py | 12 ++++++++++++ src/guidellm/benchmark/aggregator.py | 3 +++ src/guidellm/benchmark/benchmark.py | 9 +++++++++ src/guidellm/benchmark/benchmarker.py | 9 +++++++++ src/guidellm/benchmark/entrypoints.py | 2 ++ 6 files changed, 37 insertions(+) diff --git a/README.md b/README.md index a46fd411..416d3cc1 100644 --- a/README.md +++ b/README.md @@ -147,6 +147,8 @@ The `guidellm benchmark` command is used to run benchmarks against a generative - `--max-requests`: Sets the maximum number of requests for each benchmark run. If not provided, the benchmark will run until `--max-seconds` is reached or the dataset is exhausted. +- `--max-error-rate`: The maximum error rate after which a benchmark will stop. Applicable only for finite deterministic scenarios i.e `rate_type` is `constant` and `--max-seconds` exists OR `--max-requests` exists OR the dataset is finite. If `--max-error-rate` is `None`, benchmarks will continue regardless of error rate. + - `--warmup-percent`: Specifies the percentage of the benchmark to treat as a warmup phase. Requests during this phase are excluded from the final results. - `--cooldown-percent`: Specifies the percentage of the benchmark to treat as a cooldown phase. Requests during this phase are excluded from the final results. diff --git a/src/guidellm/__main__.py b/src/guidellm/__main__.py index f38b11aa..baea9f13 100644 --- a/src/guidellm/__main__.py +++ b/src/guidellm/__main__.py @@ -163,6 +163,16 @@ def cli(): "If None, will run until max_seconds or the data is exhausted." ), ) +@click.option( + "--max-error-rate", + type=float, + help=( + "The maximum error rate after which a benchmark will stop. " + "Applicable only for finite deterministic scenarios i.e rate_type is 'constant' and 'max_seconds' exists OR " + "'max_requests' exists OR the dataset is finite. " + "If None, benchmarks will continue regardless of error rate." + ), +) @click.option( "--warmup-percent", type=float, @@ -242,6 +252,7 @@ def benchmark( rate, max_seconds, max_requests, + max_error_rate, warmup_percent, cooldown_percent, disable_progress, @@ -267,6 +278,7 @@ def benchmark( rate=rate, max_seconds=max_seconds, max_requests=max_requests, + max_error_rate=max_error_rate, warmup_percent=warmup_percent, cooldown_percent=cooldown_percent, show_progress=not disable_progress, diff --git a/src/guidellm/benchmark/aggregator.py b/src/guidellm/benchmark/aggregator.py index 9943f169..9fe80be8 100644 --- a/src/guidellm/benchmark/aggregator.py +++ b/src/guidellm/benchmark/aggregator.py @@ -599,6 +599,8 @@ def compile(self) -> GenerativeBenchmark: and return the compiled object. """ successful, incomplete, errored = self._compile_results() + error_rate = self.requests_stats.totals.errored.total / \ + (self.requests_stats.totals.successful + self.requests_stats.totals.errored.total) return GenerativeBenchmark.from_stats( run_id=self.run_id, @@ -625,6 +627,7 @@ def compile(self) -> GenerativeBenchmark: request_start_time_targeted_delay_avg=self.requests_stats.request_start_time_targeted_delay.mean, request_time_delay_avg=self.requests_stats.request_time_delay.mean, request_time_avg=self.requests_stats.request_time.mean, + error_rate=error_rate, ), worker=self.worker_description, requests_loader=self.request_loader_description, diff --git a/src/guidellm/benchmark/benchmark.py b/src/guidellm/benchmark/benchmark.py index 50d2f49c..dee71fb7 100644 --- a/src/guidellm/benchmark/benchmark.py +++ b/src/guidellm/benchmark/benchmark.py @@ -90,6 +90,9 @@ class BenchmarkArgs(StandardBaseModel): max_duration: Optional[float] = Field( description="The maximum duration in seconds to run this benchmark, if any." ) + max_error_rate: Optional[float] = Field( + description="Maximum error rate after which a benchmark will stop." + ) warmup_number: Optional[int] = Field( description=( "The number of requests to run for the warmup phase of this benchmark, " @@ -213,6 +216,12 @@ class BenchmarkRunStats(StandardBaseModel): "it was completed." ) ) + error_rate: float = Field( + description=( + "The number of errored requests divided by the number of errored requests. This can be higher " + "than max_error_rate (if applicable) cause it does not take into account incomplete requests." + ) + ) class BenchmarkMetrics(StandardBaseModel): diff --git a/src/guidellm/benchmark/benchmarker.py b/src/guidellm/benchmark/benchmarker.py index 11b6d245..7da25a3b 100644 --- a/src/guidellm/benchmark/benchmarker.py +++ b/src/guidellm/benchmark/benchmarker.py @@ -74,6 +74,11 @@ class BenchmarkerStrategyLimits(StandardBaseModel): description="Maximum duration (in seconds) to process requests per strategy.", ge=0, ) + max_error_rate: Optional[float] = Field( + description="Maximum error rate after which a sync benchmark will stop", + ge=0, + le=1, + ) warmup_percent_per_strategy: Optional[float] = Field( description="Percentage of requests to use for warmup.", ge=0, @@ -148,6 +153,7 @@ async def run( profile: Profile, max_number_per_strategy: Optional[int], max_duration_per_strategy: Optional[float], + max_error_rate: Optional[float], warmup_percent_per_strategy: Optional[float], cooldown_percent_per_strategy: Optional[float], ) -> AsyncGenerator[ @@ -162,6 +168,7 @@ async def run( requests_loader_size=requests_loader_size, max_number_per_strategy=max_number_per_strategy, max_duration_per_strategy=max_duration_per_strategy, + max_error_rate=max_error_rate, warmup_percent_per_strategy=warmup_percent_per_strategy, cooldown_percent_per_strategy=cooldown_percent_per_strategy, ) @@ -196,6 +203,7 @@ async def run( scheduling_strategy=scheduling_strategy, max_number=max_number_per_strategy, max_duration=max_duration_per_strategy, + max_error_rate=max_error_rate, ): if result.type_ == "run_start": yield BenchmarkerResult( @@ -321,6 +329,7 @@ def create_benchmark_aggregator( strategy=strategy, max_number=limits.max_number, max_duration=limits.max_duration, + max_error_rate=limits.max_error_rate, warmup_number=limits.warmup_number, warmup_duration=limits.warmup_duration, cooldown_number=limits.cooldown_number, diff --git a/src/guidellm/benchmark/entrypoints.py b/src/guidellm/benchmark/entrypoints.py index f252cf27..7e4af8c0 100644 --- a/src/guidellm/benchmark/entrypoints.py +++ b/src/guidellm/benchmark/entrypoints.py @@ -41,6 +41,7 @@ async def benchmark_generative_text( rate: Optional[Union[int, float, list[Union[int, float]]]], max_seconds: Optional[float], max_requests: Optional[int], + max_error_rate: Optional[float], warmup_percent: Optional[float], cooldown_percent: Optional[float], show_progress: bool, @@ -107,6 +108,7 @@ async def benchmark_generative_text( profile=profile, max_number_per_strategy=max_requests, max_duration_per_strategy=max_seconds, + max_error_rate=max_error_rate, warmup_percent_per_strategy=warmup_percent, cooldown_percent_per_strategy=cooldown_percent, ): From 34cb6b6cbd3a1c9efe72f7db8dd8578936dd92cd Mon Sep 17 00:00:00 2001 From: mark-vaykhansky Date: Wed, 21 May 2025 13:43:34 +0300 Subject: [PATCH 08/23] make max_error_rate optional --- src/guidellm/scheduler/result.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/guidellm/scheduler/result.py b/src/guidellm/scheduler/result.py index a340932d..4159f8f3 100644 --- a/src/guidellm/scheduler/result.py +++ b/src/guidellm/scheduler/result.py @@ -46,7 +46,7 @@ class SchedulerRunInfo(StandardBaseModel): end_number: float # ToDo: Rename to max_requests & change to int (check all references before) processes: int strategy: SchedulingStrategy - max_error_rate: float + max_error_rate: Optional[float] = None created_requests: int = 0 queued_requests: int = 0 From 6289c07e4e8aed4aa7bfbd6223ea401f2ca3993c Mon Sep 17 00:00:00 2001 From: mark-vaykhansky Date: Wed, 21 May 2025 13:48:12 +0300 Subject: [PATCH 09/23] minor fixes --- src/guidellm/scheduler/scheduler.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index 628a9ac7..07d4b2e1 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -277,7 +277,7 @@ def _run_setup( start_time = time.time() times_iter = iter(scheduling_strategy.request_times()) end_time = time.time() + (max_duration or math.inf) - end_number = self._determine_total_requests_count(scheduling_strategy, max_duration, max_error_rate, max_number) + end_number = self._determine_total_requests_count(scheduling_strategy, max_duration, max_number) if end_number == math.inf and max_error_rate is not None: logger.warning("max_error_rate will be ignored because end_number can not be determined.") @@ -303,7 +303,6 @@ def _determine_total_requests_count( self, scheduling_strategy: SchedulingStrategy, max_duration: Optional[float], - max_error_rate: Optional[float], max_number: Optional[int], ) -> int: end_number = max_number or math.inf @@ -318,10 +317,6 @@ def _determine_total_requests_count( if total_requests_in_max_duration < end_number: assert total_requests_in_max_duration > 0 end_number = total_requests_in_max_duration - else: - if max_error_rate: - logger.warning() - raise except Exception: pass return end_number From d5ee01822affd222141d2e6845b921d8f09e467f Mon Sep 17 00:00:00 2001 From: mark-vaykhansky Date: Wed, 21 May 2025 14:12:11 +0300 Subject: [PATCH 10/23] reprot error rate bugfix --- src/guidellm/benchmark/aggregator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/guidellm/benchmark/aggregator.py b/src/guidellm/benchmark/aggregator.py index 9fe80be8..b66ae1f7 100644 --- a/src/guidellm/benchmark/aggregator.py +++ b/src/guidellm/benchmark/aggregator.py @@ -600,7 +600,7 @@ def compile(self) -> GenerativeBenchmark: """ successful, incomplete, errored = self._compile_results() error_rate = self.requests_stats.totals.errored.total / \ - (self.requests_stats.totals.successful + self.requests_stats.totals.errored.total) + (self.requests_stats.totals.successful.total + self.requests_stats.totals.errored.total) return GenerativeBenchmark.from_stats( run_id=self.run_id, From ce13ef7294d448c0d03a32ef4a70699188617942 Mon Sep 17 00:00:00 2001 From: mark-vaykhansky Date: Wed, 21 May 2025 14:12:24 +0300 Subject: [PATCH 11/23] add current error rate log --- src/guidellm/scheduler/scheduler.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index 07d4b2e1..1edc4286 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -168,13 +168,17 @@ async def run( run_info, ) if iter_result is not None: - if iter_result.request_info.errored \ - and not iter_result.request_info.canceled \ - and self._is_max_error_rate_reached(iter_result.run_info): - shutdown_event.set() - max_error_rate_reached = True - logger.info(f"Max_error rate of ({iter_result.run_info.max_error_rate}) " - f"reached, sending shutdown signal") + if iter_result.request_info.errored and not iter_result.request_info.canceled: + current_error_rate = run_info.errored_requests / run_info.end_number + is_over_max_error_rate = run_info.max_error_rate < current_error_rate + + if is_over_max_error_rate: + shutdown_event.set() + max_error_rate_reached = True + logger.info(f"Max error rate of ({iter_result.run_info.max_error_rate}) " + f"reached, sending shutdown signal") + else: + logger.debug(f"Current error rate: {current_error_rate}") yield iter_result # yield control to the event loop @@ -415,11 +419,6 @@ def _check_result_ready( ) raise ValueError(f"Invalid process response type: {process_response}") - @staticmethod - def _is_max_error_rate_reached(run_info: SchedulerRunInfo) -> bool: - current_error_rate = run_info.errored_requests / run_info.end_number - return current_error_rate > run_info.max_error_rate - async def _stop_processes( self, futures: list[asyncio.Future], From 9a68a7687360048f62b5fb880a9bce95fe1313ea Mon Sep 17 00:00:00 2001 From: mark-vaykhansky Date: Wed, 21 May 2025 14:19:03 +0300 Subject: [PATCH 12/23] remove todo --- src/guidellm/scheduler/result.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/guidellm/scheduler/result.py b/src/guidellm/scheduler/result.py index 4159f8f3..f899f54a 100644 --- a/src/guidellm/scheduler/result.py +++ b/src/guidellm/scheduler/result.py @@ -43,7 +43,7 @@ class SchedulerRunInfo(StandardBaseModel): start_time: float end_time: float - end_number: float # ToDo: Rename to max_requests & change to int (check all references before) + end_number: float processes: int strategy: SchedulingStrategy max_error_rate: Optional[float] = None From 6dd313de3b4275ec87fdb9c76685602d6a806e76 Mon Sep 17 00:00:00 2001 From: mark-vaykhansky Date: Wed, 21 May 2025 15:58:03 +0300 Subject: [PATCH 13/23] Fix tests --- src/guidellm/benchmark/output.py | 1 + src/guidellm/scheduler/scheduler.py | 6 ++++-- tests/unit/benchmark/test_output.py | 2 +- tests/unit/mock_benchmark.py | 2 ++ 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/guidellm/benchmark/output.py b/src/guidellm/benchmark/output.py index 4847160d..33b1efc2 100644 --- a/src/guidellm/benchmark/output.py +++ b/src/guidellm/benchmark/output.py @@ -419,6 +419,7 @@ def benchmarks_args_str(self) -> str: { "max_number": args.max_number, "max_duration": args.max_duration, + "max_error_rate": args.max_error_rate, "warmup_number": args.warmup_number, "warmup_duration": args.warmup_duration, "cooldown_number": args.cooldown_number, diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index 1edc4286..3dd873d0 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -315,13 +315,15 @@ def _determine_total_requests_count( iter_length = len(self.request_loader) # type: ignore[arg-type] if 0 < iter_length < end_number: end_number = iter_length - except InfiniteDatasetError: # noqa: BLE001, S110 + except InfiniteDatasetError: + # Only when RPS is constant and duration is capped we can determine the total + # amount of requests that are supposed to be sent if scheduling_strategy.type_ == "constant" and max_duration is not None: total_requests_in_max_duration = int(scheduling_strategy.rate * max_duration) if total_requests_in_max_duration < end_number: assert total_requests_in_max_duration > 0 end_number = total_requests_in_max_duration - except Exception: + except Exception: # noqa: BLE001, S110 pass return end_number diff --git a/tests/unit/benchmark/test_output.py b/tests/unit/benchmark/test_output.py index 9076834b..e3114491 100644 --- a/tests/unit/benchmark/test_output.py +++ b/tests/unit/benchmark/test_output.py @@ -113,7 +113,7 @@ def test_console_benchmarks_args_str(): mock_benchmark = mock_generative_benchmark() console.benchmarks = [mock_benchmark] assert console.benchmarks_args_str == ( - "max_number=None, max_duration=10.0, warmup_number=None, " + "max_number=None, max_duration=10.0, max_error_rate=0.05, warmup_number=None, " "warmup_duration=None, cooldown_number=None, cooldown_duration=None" ) diff --git a/tests/unit/mock_benchmark.py b/tests/unit/mock_benchmark.py index 81364fa1..3c360c68 100644 --- a/tests/unit/mock_benchmark.py +++ b/tests/unit/mock_benchmark.py @@ -221,6 +221,7 @@ def mock_generative_benchmark() -> GenerativeBenchmark: strategy=SynchronousStrategy(), max_number=None, max_duration=10.0, + max_error_rate=0.05, warmup_number=None, warmup_duration=None, cooldown_number=None, @@ -245,6 +246,7 @@ def mock_generative_benchmark() -> GenerativeBenchmark: request_start_time_targeted_delay_avg=1.2827096836907523, request_time_delay_avg=0.0004316908972603934, request_time_avg=1.426228676523481, + error_rate=0.345346, ), worker=GenerativeRequestsWorkerDescription( backend_type="openai_http", From 3697b308cd87c34e84370fdd6da04ef29c1a5ae9 Mon Sep 17 00:00:00 2001 From: mark-vaykhansky Date: Wed, 21 May 2025 16:14:49 +0300 Subject: [PATCH 14/23] Pre CR fixes --- README.md | 2 +- src/guidellm/__main__.py | 2 +- src/guidellm/benchmark/benchmark.py | 2 +- src/guidellm/scheduler/scheduler.py | 5 +++-- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 416d3cc1..0988c70e 100644 --- a/README.md +++ b/README.md @@ -147,7 +147,7 @@ The `guidellm benchmark` command is used to run benchmarks against a generative - `--max-requests`: Sets the maximum number of requests for each benchmark run. If not provided, the benchmark will run until `--max-seconds` is reached or the dataset is exhausted. -- `--max-error-rate`: The maximum error rate after which a benchmark will stop. Applicable only for finite deterministic scenarios i.e `rate_type` is `constant` and `--max-seconds` exists OR `--max-requests` exists OR the dataset is finite. If `--max-error-rate` is `None`, benchmarks will continue regardless of error rate. +- `--max-error-rate`: The maximum error rate after which a benchmark will stop. Applicable only for finite deterministic scenarios i.e `rate_type` is `constant` and `--max-seconds` exists OR `--max-requests` exists OR the dataset is finite. If `--max-error-rate` is `None` or not applicable, benchmarks will continue regardless of error rate. - `--warmup-percent`: Specifies the percentage of the benchmark to treat as a warmup phase. Requests during this phase are excluded from the final results. diff --git a/src/guidellm/__main__.py b/src/guidellm/__main__.py index baea9f13..5628857b 100644 --- a/src/guidellm/__main__.py +++ b/src/guidellm/__main__.py @@ -170,7 +170,7 @@ def cli(): "The maximum error rate after which a benchmark will stop. " "Applicable only for finite deterministic scenarios i.e rate_type is 'constant' and 'max_seconds' exists OR " "'max_requests' exists OR the dataset is finite. " - "If None, benchmarks will continue regardless of error rate." + "If None or not applicable, benchmarks will continue regardless of error rate." ), ) @click.option( diff --git a/src/guidellm/benchmark/benchmark.py b/src/guidellm/benchmark/benchmark.py index dee71fb7..dd391bfc 100644 --- a/src/guidellm/benchmark/benchmark.py +++ b/src/guidellm/benchmark/benchmark.py @@ -710,7 +710,7 @@ def from_stats( *["incomplete"] * len(incomplete), # type: ignore[list-item] *["error"] * len(errored), # type: ignore[list-item] ] - start_time = min(req.start_time for req in total) # ToDo: Fix if total is empty + start_time = min(req.start_time for req in total) end_time = max(req.end_time for req in total) total_with_prompt, total_types_with_prompt = ( diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index 3dd873d0..c92bdc76 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -102,7 +102,8 @@ async def run( If None, then no limit is set and either the iterator must be exhaustible or the max_number must be set. :param max_error_rate: The maximum error rate after which the scheduler shuts down. - If not provided a default of 5% i.e 0.05 is used. + Only applicable in benchmarks with finite deterministic number of requests. + If None or not applicable then scheduler will continue regardless of errors. :return: An asynchronous generator that yields SchedulerResult objects. Each SchedulerResult object contains information about the request, the response, and the run information. @@ -130,7 +131,7 @@ async def run( manager, executor, scheduling_strategy, max_error_rate is not None ) if shutdown_event: - assert not shutdown_event.is_set() + assert not shutdown_event.is_set(), "shutdown_event is set before starting scheduling" run_info, requests_iter, times_iter = self._run_setup( futures, scheduling_strategy, max_number, max_duration, max_error_rate ) From 2fe64c7092265be8e9a2f6543fc7af9968930703 Mon Sep 17 00:00:00 2001 From: mark-vaykhansky Date: Wed, 21 May 2025 16:19:04 +0300 Subject: [PATCH 15/23] CR Fixes --- src/guidellm/benchmark/benchmarker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/guidellm/benchmark/benchmarker.py b/src/guidellm/benchmark/benchmarker.py index 7da25a3b..ecb721f7 100644 --- a/src/guidellm/benchmark/benchmarker.py +++ b/src/guidellm/benchmark/benchmarker.py @@ -75,7 +75,7 @@ class BenchmarkerStrategyLimits(StandardBaseModel): ge=0, ) max_error_rate: Optional[float] = Field( - description="Maximum error rate after which a sync benchmark will stop", + description="Maximum error rate after which a benchmark will stop", ge=0, le=1, ) From b54ab14d668a8af007cf9382b29917ccee994764 Mon Sep 17 00:00:00 2001 From: mark-vaykhansky Date: Wed, 21 May 2025 18:14:40 +0300 Subject: [PATCH 16/23] Lint fixes --- src/guidellm/__main__.py | 3 +- src/guidellm/benchmark/aggregator.py | 10 ++- src/guidellm/benchmark/benchmark.py | 6 +- src/guidellm/request/__init__.py | 2 + src/guidellm/request/loader.py | 11 ++-- src/guidellm/scheduler/scheduler.py | 93 ++++++++++++++++------------ src/guidellm/scheduler/worker.py | 21 +++++-- 7 files changed, 93 insertions(+), 53 deletions(-) diff --git a/src/guidellm/__main__.py b/src/guidellm/__main__.py index 5628857b..8a1b9ff0 100644 --- a/src/guidellm/__main__.py +++ b/src/guidellm/__main__.py @@ -168,7 +168,8 @@ def cli(): type=float, help=( "The maximum error rate after which a benchmark will stop. " - "Applicable only for finite deterministic scenarios i.e rate_type is 'constant' and 'max_seconds' exists OR " + "Applicable only for finite deterministic scenarios i.e " + "rate_type is 'constant' and 'max_seconds' exists OR " "'max_requests' exists OR the dataset is finite. " "If None or not applicable, benchmarks will continue regardless of error rate." ), diff --git a/src/guidellm/benchmark/aggregator.py b/src/guidellm/benchmark/aggregator.py index b66ae1f7..73ae622a 100644 --- a/src/guidellm/benchmark/aggregator.py +++ b/src/guidellm/benchmark/aggregator.py @@ -599,8 +599,8 @@ def compile(self) -> GenerativeBenchmark: and return the compiled object. """ successful, incomplete, errored = self._compile_results() - error_rate = self.requests_stats.totals.errored.total / \ - (self.requests_stats.totals.successful.total + self.requests_stats.totals.errored.total) + + error_rate = self._calculate_error_rate() return GenerativeBenchmark.from_stats( run_id=self.run_id, @@ -634,6 +634,12 @@ def compile(self) -> GenerativeBenchmark: extras=self.extras, ) + def _calculate_error_rate(self) -> float: + total_successful = self.requests_stats.totals.successful.total + total_errored = self.requests_stats.totals.errored.total + total_sent = total_errored + total_successful + return total_errored / total_sent + def _compile_results( self, ) -> tuple[ diff --git a/src/guidellm/benchmark/benchmark.py b/src/guidellm/benchmark/benchmark.py index dd391bfc..40ffefba 100644 --- a/src/guidellm/benchmark/benchmark.py +++ b/src/guidellm/benchmark/benchmark.py @@ -218,8 +218,10 @@ class BenchmarkRunStats(StandardBaseModel): ) error_rate: float = Field( description=( - "The number of errored requests divided by the number of errored requests. This can be higher " - "than max_error_rate (if applicable) cause it does not take into account incomplete requests." + "The number of errored requests divided by the number " + "of errored requests. This can be higher than max_error_rate " + "(if applicable) cause it does not take into " + "account incomplete requests." ) ) diff --git a/src/guidellm/request/__init__.py b/src/guidellm/request/__init__.py index db3059cc..606fb897 100644 --- a/src/guidellm/request/__init__.py +++ b/src/guidellm/request/__init__.py @@ -1,6 +1,7 @@ from .loader import ( GenerativeRequestLoader, GenerativeRequestLoaderDescription, + GetInfiniteDatasetLengthError, RequestLoader, RequestLoaderDescription, ) @@ -10,6 +11,7 @@ "GenerationRequest", "GenerativeRequestLoader", "GenerativeRequestLoaderDescription", + "GetInfiniteDatasetLengthError", "RequestLoader", "RequestLoaderDescription", ] diff --git a/src/guidellm/request/loader.py b/src/guidellm/request/loader.py index 0e54fc45..62bd17ea 100644 --- a/src/guidellm/request/loader.py +++ b/src/guidellm/request/loader.py @@ -19,13 +19,13 @@ __all__ = [ "GenerativeRequestLoader", "GenerativeRequestLoaderDescription", + "GetInfiniteDatasetLengthError", "RequestLoader", "RequestLoaderDescription", - "InfiniteDatasetError" ] -class InfiniteDatasetError(Exception): +class GetInfiniteDatasetLengthError(Exception): pass @@ -125,8 +125,11 @@ def __len__(self) -> int: if self.iter_type == "finite": return self.num_unique_items() - assert self.iter_type == "infinite" - raise InfiniteDatasetError(f"Dataset {self.data} is infinite and thus unable to determine length") + if self.iter_type != "infinite": + raise ValueError(f"Invalid iter_type {self.iter_type}") + raise GetInfiniteDatasetLengthError(f"Dataset {self.data} is " + f"infinite and thus " + f"unable to determine length") @property def description(self) -> GenerativeRequestLoaderDescription: diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index c92bdc76..6bdcbcfe 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -15,7 +15,7 @@ from loguru import logger from guidellm.config import settings -from guidellm.request.loader import InfiniteDatasetError +from guidellm.request.loader import GetInfiniteDatasetLengthError from guidellm.scheduler.result import ( SchedulerRequestResult, SchedulerResult, @@ -101,24 +101,15 @@ async def run( :param max_duration: The maximum duration for the scheduling run. If None, then no limit is set and either the iterator must be exhaustible or the max_number must be set. - :param max_error_rate: The maximum error rate after which the scheduler shuts down. + :param max_error_rate: The maximum error rate after which the + scheduler shuts down. Only applicable in benchmarks with finite deterministic number of requests. If None or not applicable then scheduler will continue regardless of errors. :return: An asynchronous generator that yields SchedulerResult objects. Each SchedulerResult object contains information about the request, the response, and the run information. """ - if scheduling_strategy is None or not isinstance( - scheduling_strategy, SchedulingStrategy - ): - raise ValueError(f"Invalid scheduling strategy: {scheduling_strategy}") - - if max_number is not None and max_number < 1: - raise ValueError(f"Invalid max_number: {max_number}") - if max_duration is not None and max_duration < 0: - raise ValueError(f"Invalid max_duration: {max_duration}") - if max_error_rate is not None and (max_error_rate < 0 or max_error_rate > 1): - raise ValueError(f"Invalid max_error_rate: {max_error_rate}") + self._validate_scheduler_params(scheduling_strategy, max_duration, max_error_rate, max_number) with ( multiprocessing.Manager() as manager, @@ -127,11 +118,13 @@ async def run( ) as executor, ): requests_iter: Optional[Iterator[Any]] = None - futures, requests_queue, responses_queue, shutdown_event = await self._start_processes( - manager, executor, scheduling_strategy, max_error_rate is not None - ) - if shutdown_event: - assert not shutdown_event.is_set(), "shutdown_event is set before starting scheduling" + futures, requests_queue, responses_queue, shutdown_event = \ + await self._start_processes( + manager, executor, scheduling_strategy, max_error_rate is not None) + if shutdown_event and shutdown_event.is_set(): + raise RuntimeError( + "shutdown_event is set before starting scheduling" + ) run_info, requests_iter, times_iter = self._run_setup( futures, scheduling_strategy, max_number, max_duration, max_error_rate ) @@ -169,17 +162,14 @@ async def run( run_info, ) if iter_result is not None: - if iter_result.request_info.errored and not iter_result.request_info.canceled: - current_error_rate = run_info.errored_requests / run_info.end_number - is_over_max_error_rate = run_info.max_error_rate < current_error_rate - - if is_over_max_error_rate: - shutdown_event.set() - max_error_rate_reached = True - logger.info(f"Max error rate of ({iter_result.run_info.max_error_rate}) " - f"reached, sending shutdown signal") - else: - logger.debug(f"Current error rate: {current_error_rate}") + if iter_result.request_info.errored \ + and not iter_result.request_info.canceled \ + and self._is_max_error_rate_reached(iter_result.run_info): + shutdown_event.set() + max_error_rate_reached = True + logger.info(f"Max error rate of " + f"({iter_result.run_info.max_error_rate}) " + f"reached, sending shutdown signal") yield iter_result # yield control to the event loop @@ -194,6 +184,28 @@ async def run( await self._stop_processes(futures, requests_queue) + def _validate_scheduler_params( + self, + scheduling_strategy: SchedulingStrategy, + max_duration: Optional[float], + max_error_rate: Optional[float], + max_number: Optional[int] + ) -> None: + if scheduling_strategy is None or not isinstance( + scheduling_strategy, SchedulingStrategy + ): + raise ValueError(f"Invalid scheduling strategy: {scheduling_strategy}") + if max_number is not None and max_number < 1: + raise ValueError(f"Invalid max_number: {max_number}") + if max_duration is not None and max_duration < 0: + raise ValueError(f"Invalid max_duration: {max_duration}") + if max_error_rate is not None and (max_error_rate < 0 or max_error_rate > 1): + raise ValueError(f"Invalid max_error_rate: {max_error_rate}") + + def _is_max_error_rate_reached(self, run_info) -> bool: + current_error_rate = run_info.errored_requests / run_info.end_number + return run_info.max_error_rate < current_error_rate + async def _start_processes( self, manager, @@ -282,10 +294,13 @@ def _run_setup( start_time = time.time() times_iter = iter(scheduling_strategy.request_times()) end_time = time.time() + (max_duration or math.inf) - end_number = self._determine_total_requests_count(scheduling_strategy, max_duration, max_number) + end_number = self._determine_total_requests_count( + scheduling_strategy, max_duration, max_number + ) if end_number == math.inf and max_error_rate is not None: - logger.warning("max_error_rate will be ignored because end_number can not be determined.") + logger.warning("max_error_rate will be ignored " + "because end_number can not be determined.") if end_number == math.inf and end_time is None: logger.warning( @@ -312,17 +327,19 @@ def _determine_total_requests_count( ) -> int: end_number = max_number or math.inf try: - # update end number if the request loader is finite and less than max + # update end_number if the request_loader is finite and less than max_number iter_length = len(self.request_loader) # type: ignore[arg-type] if 0 < iter_length < end_number: end_number = iter_length - except InfiniteDatasetError: - # Only when RPS is constant and duration is capped we can determine the total - # amount of requests that are supposed to be sent + except GetInfiniteDatasetLengthError: + # Only when RPS is constant and duration is + # capped we can determine the total amount of requests + # that are supposed to be sent if scheduling_strategy.type_ == "constant" and max_duration is not None: - total_requests_in_max_duration = int(scheduling_strategy.rate * max_duration) - if total_requests_in_max_duration < end_number: - assert total_requests_in_max_duration > 0 + total_requests_in_max_duration = int( + scheduling_strategy.rate * max_duration + ) + if 0 < total_requests_in_max_duration < end_number: end_number = total_requests_in_max_duration except Exception: # noqa: BLE001, S110 pass diff --git a/src/guidellm/scheduler/worker.py b/src/guidellm/scheduler/worker.py index 4515fefa..800207a0 100644 --- a/src/guidellm/scheduler/worker.py +++ b/src/guidellm/scheduler/worker.py @@ -128,18 +128,26 @@ async def get_request( process_id: Optional[int] = None, ) -> Optional[WorkerProcessRequest[RequestT]]: if shutdown_event is not None and process_id is None: - logger.warning("shutdown_event is not None and process_id is None which makes it hard to debug") + logger.warning("shutdown_event is not None and process_id " + "is None which makes it hard to debug") def _get_queue_intermittently(): - assert shutdown_event is not None + if shutdown_event is None: + raise ValueError("Shouldn't use _get_queue_intermittently " + "if there's no shutdown_even") while True: try: - return requests_queue.get(timeout=timedelta(seconds=1).total_seconds()) + get_timeout = timedelta(seconds=1).total_seconds() + return requests_queue.get(timeout=get_timeout) except queue.Empty: if shutdown_event.is_set(): logger.info(f"Shutdown signal received in future {process_id}") - return - return await asyncio.to_thread(_get_queue_intermittently if shutdown_event is not None else requests_queue.get) # type: ignore[attr-defined] + return None + + get_method = _get_queue_intermittently \ + if shutdown_event is not None \ + else requests_queue.get + return await asyncio.to_thread(get_method) # type: ignore[attr-defined] async def send_result( self, @@ -165,7 +173,8 @@ async def resolve_scheduler_request( scheduled_time=time.time(), process_id=process_id, ) - request_scheduled_result: WorkerProcessResult[RequestT, ResponseT] = WorkerProcessResult( + request_scheduled_result: WorkerProcessResult[RequestT, ResponseT] = \ + WorkerProcessResult( type_="request_scheduled", request=request, response=None, From b502c9488cd497831a821f91291a42eecfe01c33 Mon Sep 17 00:00:00 2001 From: mark-vaykhansky Date: Wed, 21 May 2025 18:16:22 +0300 Subject: [PATCH 17/23] Lint fixes --- src/guidellm/scheduler/scheduler.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index 6bdcbcfe..db505181 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -109,7 +109,10 @@ async def run( Each SchedulerResult object contains information about the request, the response, and the run information. """ - self._validate_scheduler_params(scheduling_strategy, max_duration, max_error_rate, max_number) + self._validate_scheduler_params(scheduling_strategy, + max_duration, + max_error_rate, + max_number) with ( multiprocessing.Manager() as manager, @@ -163,8 +166,8 @@ async def run( ) if iter_result is not None: if iter_result.request_info.errored \ - and not iter_result.request_info.canceled \ - and self._is_max_error_rate_reached(iter_result.run_info): + and not iter_result.request_info.canceled \ + and self._is_max_error_rate_reached(iter_result.run_info): shutdown_event.set() max_error_rate_reached = True logger.info(f"Max error rate of " From 332ef08a5084c3846a38d444c815196cf3190266 Mon Sep 17 00:00:00 2001 From: markvaykhansky Date: Wed, 21 May 2025 19:14:21 +0300 Subject: [PATCH 18/23] better var name --- src/guidellm/benchmark/aggregator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/guidellm/benchmark/aggregator.py b/src/guidellm/benchmark/aggregator.py index 73ae622a..cd725326 100644 --- a/src/guidellm/benchmark/aggregator.py +++ b/src/guidellm/benchmark/aggregator.py @@ -637,8 +637,8 @@ def compile(self) -> GenerativeBenchmark: def _calculate_error_rate(self) -> float: total_successful = self.requests_stats.totals.successful.total total_errored = self.requests_stats.totals.errored.total - total_sent = total_errored + total_successful - return total_errored / total_sent + total_finished = total_errored + total_successful + return total_errored / total_finished def _compile_results( self, From c2fd813233fe0cdd253796205464e6e6167deeff Mon Sep 17 00:00:00 2001 From: mark-vaykhansky Date: Thu, 22 May 2025 08:33:33 +0300 Subject: [PATCH 19/23] Type fixes, typos & bugfixes --- src/guidellm/__main__.py | 4 ++-- src/guidellm/scheduler/scheduler.py | 16 ++++++++++++---- src/guidellm/scheduler/worker.py | 15 ++++++++------- 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/src/guidellm/__main__.py b/src/guidellm/__main__.py index 8a1b9ff0..bfa566b2 100644 --- a/src/guidellm/__main__.py +++ b/src/guidellm/__main__.py @@ -179,7 +179,7 @@ def cli(): type=float, default=None, help=( - "The percent of the benchmark (based on max-seconds, max-requets, " + "The percent of the benchmark (based on max-seconds, max-requests, " "or lenth of dataset) to run as a warmup and not include in the final results. " "Defaults to None." ), @@ -188,7 +188,7 @@ def cli(): "--cooldown-percent", type=float, help=( - "The percent of the benchmark (based on max-seconds, max-requets, or lenth " + "The percent of the benchmark (based on max-seconds, max-requests, or length " "of dataset) to run as a cooldown and not include in the final results. " "Defaults to None." ), diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index db505181..ceffecd3 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -5,6 +5,7 @@ import time from collections.abc import AsyncGenerator, Iterable, Iterator from concurrent.futures import ProcessPoolExecutor +from multiprocessing.synchronize import Event as MultiprocessingEvent from typing import ( Any, Generic, @@ -168,11 +169,15 @@ async def run( if iter_result.request_info.errored \ and not iter_result.request_info.canceled \ and self._is_max_error_rate_reached(iter_result.run_info): + if shutdown_event is None: + raise RuntimeError("We've reached max_error_rate " + "but shutdown_event is corrupt") shutdown_event.set() max_error_rate_reached = True logger.info(f"Max error rate of " f"({iter_result.run_info.max_error_rate}) " f"reached, sending shutdown signal") + logger.info("Itter is not None") yield iter_result # yield control to the event loop @@ -205,8 +210,12 @@ def _validate_scheduler_params( if max_error_rate is not None and (max_error_rate < 0 or max_error_rate > 1): raise ValueError(f"Invalid max_error_rate: {max_error_rate}") - def _is_max_error_rate_reached(self, run_info) -> bool: + def _is_max_error_rate_reached(self, run_info: SchedulerRunInfo) -> bool: + if run_info.max_error_rate is None: + return False current_error_rate = run_info.errored_requests / run_info.end_number + logger.info(f"Current error rate {current_error_rate} " + f"i.e total_finished [success / error] / max total possible") return run_info.max_error_rate < current_error_rate async def _start_processes( @@ -219,7 +228,7 @@ async def _start_processes( list[asyncio.Future], multiprocessing.Queue, multiprocessing.Queue, - Optional[multiprocessing.Event] + Optional[MultiprocessingEvent] ]: await self.worker.prepare_multiprocessing() shutdown_event = manager.Event() if create_shutdown_event else None @@ -232,7 +241,6 @@ async def _start_processes( scheduling_strategy.processes_limit, scheduling_strategy.processing_requests_limit, ) - num_processes = 1 requests_limit_split = ( scheduling_strategy.processing_requests_limit // scheduling_strategy.processes_limit @@ -327,7 +335,7 @@ def _determine_total_requests_count( scheduling_strategy: SchedulingStrategy, max_duration: Optional[float], max_number: Optional[int], - ) -> int: + ) -> Union[int, float]: end_number = max_number or math.inf try: # update end_number if the request_loader is finite and less than max_number diff --git a/src/guidellm/scheduler/worker.py b/src/guidellm/scheduler/worker.py index 800207a0..f4072c5d 100644 --- a/src/guidellm/scheduler/worker.py +++ b/src/guidellm/scheduler/worker.py @@ -1,6 +1,5 @@ import asyncio import math -import multiprocessing import multiprocessing.queues import queue import time @@ -8,6 +7,7 @@ from collections.abc import AsyncGenerator from dataclasses import dataclass from datetime import timedelta +from multiprocessing.synchronize import Event as MultiprocessingEvent from typing import ( Any, Generic, @@ -124,7 +124,7 @@ async def resolve( async def get_request( self, requests_queue: multiprocessing.Queue, - shutdown_event: Optional[multiprocessing.Event] = None, + shutdown_event: Optional[MultiprocessingEvent] = None, process_id: Optional[int] = None, ) -> Optional[WorkerProcessRequest[RequestT]]: if shutdown_event is not None and process_id is None: @@ -186,7 +186,8 @@ async def resolve_scheduler_request( await asyncio.sleep(wait_time) info.worker_start = time.time() - request_start_result = WorkerProcessResult( + request_start_result: WorkerProcessResult[RequestT, ResponseT] = \ + WorkerProcessResult( type_="request_start", request=request, response=None, @@ -215,7 +216,7 @@ def process_loop_synchronous( requests_queue: multiprocessing.Queue, results_queue: multiprocessing.Queue, process_id: int, - shutdown_event: Optional[multiprocessing.Event] = None, + shutdown_event: Optional[MultiprocessingEvent] = None, ): async def _process_runner(): while ( @@ -256,7 +257,7 @@ def process_loop_asynchronous( results_queue: multiprocessing.Queue, max_concurrency: int, process_id: int, - shutdown_event: Optional[multiprocessing.Event] = None, + shutdown_event: Optional[MultiprocessingEvent] = None, ): async def _process_runner(): pending = asyncio.Semaphore(max_concurrency) @@ -355,7 +356,7 @@ def process_loop_synchronous( requests_queue: multiprocessing.Queue, results_queue: multiprocessing.Queue, process_id: int, - shutdown_event: Optional[multiprocessing.Event] = None + shutdown_event: Optional[MultiprocessingEvent] = None ): asyncio.run(self.backend.validate()) super().process_loop_synchronous( @@ -371,7 +372,7 @@ def process_loop_asynchronous( results_queue: multiprocessing.Queue, max_concurrency: int, process_id: int, - shutdown_event: Optional[multiprocessing.Event] = None + shutdown_event: Optional[MultiprocessingEvent] = None ): asyncio.run(self.backend.validate()) super().process_loop_asynchronous( From 4bda8cf20c118ca3ecf0dc6b3d11813a0556e5db Mon Sep 17 00:00:00 2001 From: mark-vaykhansky Date: Thu, 22 May 2025 10:11:42 +0300 Subject: [PATCH 20/23] Remove spammy log + bugfix --- src/guidellm/scheduler/scheduler.py | 1 - src/guidellm/scheduler/worker.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index ceffecd3..4097cfed 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -177,7 +177,6 @@ async def run( logger.info(f"Max error rate of " f"({iter_result.run_info.max_error_rate}) " f"reached, sending shutdown signal") - logger.info("Itter is not None") yield iter_result # yield control to the event loop diff --git a/src/guidellm/scheduler/worker.py b/src/guidellm/scheduler/worker.py index f4072c5d..bc77a11b 100644 --- a/src/guidellm/scheduler/worker.py +++ b/src/guidellm/scheduler/worker.py @@ -281,6 +281,7 @@ def _task_done(_: asyncio.Task): if shutdown_event and shutdown_event.is_set(): logger.info(f"Shutdown signal received in future {process_id}") + pending.release() break task = asyncio.create_task( self.resolve_scheduler_request( From 26319a5c89fba8105709a46811fd95d5b5f1f33d Mon Sep 17 00:00:00 2001 From: mark-vaykhansky Date: Thu, 22 May 2025 15:39:57 +0300 Subject: [PATCH 21/23] Sleep interminetly --- src/guidellm/scheduler/worker.py | 47 +++++++++++++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/src/guidellm/scheduler/worker.py b/src/guidellm/scheduler/worker.py index bc77a11b..41b4423d 100644 --- a/src/guidellm/scheduler/worker.py +++ b/src/guidellm/scheduler/worker.py @@ -165,6 +165,7 @@ async def resolve_scheduler_request( timeout_time: float, results_queue: multiprocessing.Queue, process_id: int, + shutdown_event: Optional[MultiprocessingEvent] = None, ): info = SchedulerRequestInfo( targeted_start_time=start_time, @@ -183,7 +184,21 @@ async def resolve_scheduler_request( asyncio.create_task(self.send_result(results_queue, request_scheduled_result)) if (wait_time := start_time - time.time()) > 0: - await asyncio.sleep(wait_time) + if shutdown_event is None: + await asyncio.sleep(wait_time) + else: + shutdown_signal_received = \ + await self._sleep_intermittently_until_timestamp_or_shutdown( + sleep_until_timestamp=start_time, + shutdown_event=shutdown_event, + ) + if shutdown_signal_received: + logger.info( + "Received shutdown signal " + "while waiting to start " + f"|| Process ID {process_id}" + ) + return info.worker_start = time.time() request_start_result: WorkerProcessResult[RequestT, ResponseT] = \ @@ -211,6 +226,18 @@ async def resolve_scheduler_request( ) asyncio.create_task(self.send_result(results_queue, result)) + async def _sleep_intermittently_until_timestamp_or_shutdown( + self, + sleep_until_timestamp: float, + shutdown_event: MultiprocessingEvent, + ) -> bool: + delta = timedelta(seconds=10).total_seconds() + while time.time() < sleep_until_timestamp: + await asyncio.sleep(delta) + if shutdown_event.is_set(): + return True + return False + def process_loop_synchronous( self, requests_queue: multiprocessing.Queue, @@ -240,6 +267,7 @@ async def _process_runner(): timeout_time=process_request.timeout_time, results_queue=results_queue, process_id=process_id, + shutdown_event=shutdown_event, ) try: @@ -271,10 +299,26 @@ async def _process_runner(): shutdown_event=shutdown_event, process_id=process_id) ) is not None: + if shutdown_event and shutdown_event.is_set(): + logger.error("This shouldn't happen! " + "We should catch the " + "shutdown in the get wrapper") + logger.info(f"Shutdown signal received" + f" in future {process_id}") + break + dequeued_time = time.time() + logger.debug(f"Dequeued Process ID {process_id} || " + f"Timestamp {dequeued_time} || " + f"Semaphore {pending._value}/{max_concurrency}") await pending.acquire() + lock_acquired_at = time.time() + logger.debug(f"Lock acquired Process ID {process_id} ||" + f" Timestamp {lock_acquired_at} ||" + f" Semaphore {pending._value}/{max_concurrency}") + def _task_done(_: asyncio.Task): nonlocal pending pending.release() @@ -292,6 +336,7 @@ def _task_done(_: asyncio.Task): timeout_time=process_request.timeout_time, results_queue=results_queue, process_id=process_id, + shutdown_event=shutdown_event, ) ) task.add_done_callback(_task_done) From 09925a40c9d3ef1fc4e6ba1d23ed88442fd173ed Mon Sep 17 00:00:00 2001 From: mark-vaykhansky Date: Thu, 22 May 2025 15:43:51 +0300 Subject: [PATCH 22/23] Add missing error log --- src/guidellm/scheduler/worker.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/guidellm/scheduler/worker.py b/src/guidellm/scheduler/worker.py index 41b4423d..6883f739 100644 --- a/src/guidellm/scheduler/worker.py +++ b/src/guidellm/scheduler/worker.py @@ -254,6 +254,9 @@ async def _process_runner(): ) ) is not None: if shutdown_event and shutdown_event.is_set(): + logger.error("This shouldn't happen! " + "We should catch the " + "shutdown in the get wrapper") logger.info(f"Shutdown signal received in future {process_id}") break From fa562587b0bfdcf2441eb6ef05a88e21c1b97bd0 Mon Sep 17 00:00:00 2001 From: mark-vaykhansky Date: Thu, 22 May 2025 16:22:12 +0300 Subject: [PATCH 23/23] linting fixes --- src/guidellm/scheduler/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/guidellm/scheduler/worker.py b/src/guidellm/scheduler/worker.py index 6883f739..f37b7708 100644 --- a/src/guidellm/scheduler/worker.py +++ b/src/guidellm/scheduler/worker.py @@ -313,14 +313,14 @@ async def _process_runner(): dequeued_time = time.time() logger.debug(f"Dequeued Process ID {process_id} || " f"Timestamp {dequeued_time} || " - f"Semaphore {pending._value}/{max_concurrency}") + f"Semaphore {pending._value}/{max_concurrency}") # noqa: SLF001 await pending.acquire() lock_acquired_at = time.time() logger.debug(f"Lock acquired Process ID {process_id} ||" f" Timestamp {lock_acquired_at} ||" - f" Semaphore {pending._value}/{max_concurrency}") + f" Semaphore {pending._value}/{max_concurrency}") # noqa: SLF001 def _task_done(_: asyncio.Task): nonlocal pending