diff --git a/config.yaml b/config.yaml index 03c2d2f5..3d0a8616 100644 --- a/config.yaml +++ b/config.yaml @@ -6,15 +6,16 @@ storage: # TODO type: local dataset: file: "datasets/openorca_large_subset_011.jsonl" - max_queries: 1000 + max_queries: 4000 min_input_tokens: 0 max_input_tokens: 1024 min_output_tokens: 0 - max_output_tokens: 1024 + max_output_tokens: 512 max_sequence_tokens: 2048 load_options: - type: constant #Future options: loadgen, stair-step - concurrency: 1 + type: rps #Options: concurrency, rps, loadgen, stair-step + concurrency: 4 + rps: 16 duration: 20 # In seconds. Maybe in future support "100s" "10m", etc... plugin: "openai_plugin" plugin_options: diff --git a/dataset.py b/dataset.py index 0efb2f2f..6c0d1f66 100644 --- a/dataset.py +++ b/dataset.py @@ -12,7 +12,7 @@ class Dataset: def __init__(self, file, model_name="", - max_queries=3000, + max_queries=8000, min_input_tokens=0, max_input_tokens=16000, min_output_tokens=0, @@ -47,7 +47,7 @@ def get_next_n_queries(self, n): def initialize_dataset( filename, model_name="", - max_queries=3000, + max_queries=8000, min_input_tokens=0, max_input_tokens=16000, min_output_tokens=0, diff --git a/load_test.py b/load_test.py index d1ff9da4..fd1336bd 100755 --- a/load_test.py +++ b/load_test.py @@ -14,22 +14,43 @@ import utils -def run_main_process(concurrency, duration, dataset, dataset_q, stop_q): +def run_main_process(rps, duration, dataset, request_q, stop_q): """Run the main process.""" logging.info("Test from main process") - # Initialize the dataset_queue with 4*concurrency requests + start_time = time.time() + end_time = start_time + duration + if rps is not None: + main_loop_rps_mode(dataset, request_q, rps, start_time, end_time) + else: + main_loop_concurrency_mode(dataset, request_q, start_time, end_time) + + logging.info("Timer ended, stopping processes") + + # Signal users to stop sending requests + stop_q.put(None) + + # Empty the dataset queue + while not request_q.empty(): + logging.debug("Removing element from request_q") + request_q.get() + + return + +def main_loop_concurrency_mode(dataset, request_q, start_time, end_time): + """Let all users send requests repeatedly until end_time""" + logging.info("Test from main process") + + # Initialize the request_q with 2*concurrency requests for query in dataset.get_next_n_queries(2 * concurrency): - dataset_q.put(query) + request_q.put((None, query)) - start_time = time.time() current_time = start_time - while (current_time - start_time) < duration: - # Keep the dataset queue full for duration - if dataset_q.qsize() < int(0.5*concurrency + 1): + while current_time < end_time: + if request_q.qsize() < int(0.5*concurrency + 1): logging.info("Adding %d entries to dataset queue", concurrency) for query in dataset.get_next_n_queries(concurrency): - dataset_q.put(query) + request_q.put((None, query)) time.sleep(0.1) current_time = time.time() @@ -38,13 +59,40 @@ def run_main_process(concurrency, duration, dataset, dataset_q, stop_q): # Signal users to stop sending requests stop_q.put(None) - # Empty the dataset queue - while not dataset_q.empty(): - logging.debug("Removing element from dataset_q") - dataset_q.get() - - return +def request_schedule_constant_rps(rps, start_time, end_time): + """Returns a list of timestamps for request schedule with constant RPS""" + interval = 1 / rps + next_req_time = start_time + while next_req_time < end_time: + yield(next_req_time) + next_req_time = next_req_time + interval + + +# This function should support non-constant RPS in the future +def main_loop_rps_mode(dataset, request_q, rps, start_time, end_time): + """Dispatch requests with constant RPS, via schedule_q""" + req_times = request_schedule_constant_rps(rps, start_time, end_time) + + current_time = time.time() + query = dataset.get_next_n_queries(1)[0] + for next_req_time in req_times: + while next_req_time > current_time: + # Wait or spin until next req needs to be dispatched + sleep_time = (next_req_time - current_time) - 0.03 # Sleep until 30ms before next_req_time + if sleep_time > 0: + time.sleep(sleep_time) + # else spin + current_time = time.time() + + logging.info(f"Scheduling request time {next_req_time}") + request_q.put((next_req_time, query)) + + query = dataset.get_next_n_queries(1)[0] + + if current_time >= end_time: + return + def gather_results(results_pipes): """Get the results.""" @@ -57,16 +105,16 @@ def gather_results(results_pipes): return results_list -def exit_gracefully(procs, dataset_q, stop_q, logger_q, log_reader_thread, code): +def exit_gracefully(procs, request_q, stop_q, logger_q, log_reader_thread, code): """Exit gracefully.""" # Signal users to stop sending requests if stop_q.empty(): stop_q.put(None) - if dataset_q is not None and not dataset_q.empty(): - logging.warning("Removing more elements from dataset_q after gathering results!") - while not dataset_q.empty(): - dataset_q.get() + if request_q is not None and not request_q.empty(): + logging.warning("Removing more elements from request_q after gathering results!") + while not request_q.empty(): + request_q.get() logging.debug("Calling join() on all user processes") for proc in procs: @@ -89,20 +137,22 @@ def main(args): log_reader_thread = logging_utils.init_logging(args.log_level, logger_q) # Create processes and their Users + request_q = mp_ctx.Queue(1) + request_q.cancel_join_thread() stop_q = mp_ctx.Queue(1) - dataset_q = mp_ctx.Queue() + procs = [] results_pipes = [] # Parse config logging.debug("Parsing YAML config file %s", args.config) - concurrency, duration, plugin = 0, 0, None + rps, concurrency, duration, plugin = None, 0, 0, None try: config = utils.yaml_load(args.config) - concurrency, duration, plugin = utils.parse_config(config) + rps, concurrency, duration, plugin = utils.parse_config(config) except Exception as e: logging.error("Exiting due to invalid input: %s", repr(e)) - exit_gracefully(procs, dataset_q, stop_q, logger_q, log_reader_thread, 1) + exit_gracefully(procs, request_q, stop_q, logger_q, log_reader_thread, 1) try: logging.debug("Creating dataset with configuration %s", config["dataset"]) @@ -110,27 +160,28 @@ def main(args): model_name = config.get("plugin_options", {}).get("model_name", "") dataset = Dataset(model_name=model_name, **config["dataset"]) - logging.debug("Creating %s Users and corresponding processes", concurrency) + logging.info("Creating %s Users and corresponding processes", concurrency) for idx in range(concurrency): send_results, recv_results = mp_ctx.Pipe() + results_pipes.append(recv_results) user = User( idx, - dataset_q=dataset_q, + request_q=request_q, stop_q=stop_q, results_pipe=send_results, plugin=plugin, logger_q=logger_q, log_level=args.log_level, run_duration=duration, + rate_limited=(rps is not None) ) proc = mp_ctx.Process(target=user.run_user_process) procs.append(proc) logging.info("Starting %s", proc) proc.start() - results_pipes.append(recv_results) logging.debug("Running main process") - run_main_process(concurrency, duration, dataset, dataset_q, stop_q) + run_main_process(rps, duration, dataset, request_q, stop_q) results_list = gather_results(results_pipes) @@ -139,13 +190,12 @@ def main(args): # Terminate queues immediately on ^C except KeyboardInterrupt: stop_q.cancel_join_thread() - dataset_q.cancel_join_thread() - exit_gracefully(procs, dataset_q, stop_q, logger_q, log_reader_thread, 130) + exit_gracefully(procs, request_q, stop_q, logger_q, log_reader_thread, 130) except Exception: logging.exception("Unexpected exception in main process") - exit_gracefully(procs, dataset_q, stop_q, logger_q, log_reader_thread, 1) + exit_gracefully(procs, request_q, stop_q, logger_q, log_reader_thread, 1) - exit_gracefully(procs, dataset_q, stop_q, logger_q, log_reader_thread, 0) + exit_gracefully(procs, request_q, stop_q, logger_q, log_reader_thread, 0) if __name__ == "__main__": diff --git a/plugins/caikit_client_plugin.py b/plugins/caikit_client_plugin.py index c5cafe3e..c1493a0d 100644 --- a/plugins/caikit_client_plugin.py +++ b/plugins/caikit_client_plugin.py @@ -79,7 +79,6 @@ def request_grpc(self, query, user_id, test_end_time: float=0): result.output_tokens_before_timeout = result.output_tokens result.output_text = response - result.calculate_results() return result def streaming_request_grpc(self, query, user_id, test_end_time: float=0): @@ -113,8 +112,6 @@ def streaming_request_grpc(self, query, user_id, test_end_time: float=0): # TODO: Calculate correct output tokens before test timeout duration for streaming requests result.output_tokens_before_timeout = result.output_tokens - result.calculate_results() - return result def request_http(self, query, user_id): @@ -138,7 +135,6 @@ def request_http(self, query, user_id): result.output_tokens_before_timeout = result.output_tokens result.output_text = response - result.calculate_results() return result def streaming_request_http(self, query, user_id): @@ -171,5 +167,4 @@ def streaming_request_http(self, query, user_id): # TODO: Calculate correct output tokens before test timeout duration for streaming requests result.output_tokens_before_timeout = result.output_tokens - result.calculate_results() return result diff --git a/plugins/dummy_plugin.py b/plugins/dummy_plugin.py index eedf42c1..d80352d3 100644 --- a/plugins/dummy_plugin.py +++ b/plugins/dummy_plugin.py @@ -35,7 +35,6 @@ def request_http(self, query, user_id, test_end_time: float=0): result.end_time = time.time() - result.calculate_results() return result @@ -63,5 +62,4 @@ def streaming_request_http(self, query, user_id, test_end_time: float=0): # TODO: Calculate correct output tokens before test timeout duration for streaming requests result.output_tokens_before_timeout = result.output_tokens - result.calculate_results() return result diff --git a/plugins/hf_tgi_plugin.py b/plugins/hf_tgi_plugin.py index 4bb9edf2..48d084c8 100644 --- a/plugins/hf_tgi_plugin.py +++ b/plugins/hf_tgi_plugin.py @@ -114,5 +114,4 @@ def streaming_request_http(self, query, user_id, test_end_time: float=0): # TODO: Calculate correct output tokens before test timeout duration for streaming requests result.output_tokens_before_timeout = result.output_tokens - result.calculate_results() return result diff --git a/plugins/openai_plugin.py b/plugins/openai_plugin.py index fa0d7331..3d13daa4 100644 --- a/plugins/openai_plugin.py +++ b/plugins/openai_plugin.py @@ -173,7 +173,6 @@ def request_http(self, query: dict, user_id: int, test_end_time: float = 0): # For non-streaming requests we are keeping output_tokens_before_timeout and output_tokens same. result.output_tokens_before_timeout = result.output_tokens - result.calculate_results() return result @@ -356,5 +355,4 @@ def streaming_request_http(self, query: dict, user_id: int, test_end_time: float if expected_output_tokens and result.output_tokens != expected_output_tokens: logger.warning(f"Received {result.output_tokens} tokens but expected {expected_output_tokens} tokens") - result.calculate_results() return result diff --git a/plugins/tgis_grpc_plugin.py b/plugins/tgis_grpc_plugin.py index 0469be1b..2ef77a5b 100644 --- a/plugins/tgis_grpc_plugin.py +++ b/plugins/tgis_grpc_plugin.py @@ -121,7 +121,6 @@ def make_request(self, query: dict, user_id: int, test_end_time: float = 0): else: result.output_tokens = query["output_tokens"] - result.calculate_results() return result def make_request_stream(self, query: dict, user_id: int, test_end_time: float): @@ -199,5 +198,4 @@ def make_request_stream(self, query: dict, user_id: int, test_end_time: float): logger.warning("Output token count not found in response, using dataset expected output tokens") result.output_tokens = len(tokens) - result.calculate_results() return result diff --git a/result.py b/result.py index 98882e6e..f26f7a9a 100644 --- a/result.py +++ b/result.py @@ -14,10 +14,12 @@ def __init__(self, user_id, input_id, input_tokens=None): self.output_text: Optional[str] = None self.output_tokens: Optional[int] = None self.output_tokens_before_timeout: Optional[int] = None + self.scheduled_start_time: Optional[float] = None self.start_time: Optional[float] = None self.ack_time: Optional[float] = None self.first_token_time: Optional[float] = None self.end_time: Optional[float] = None + self.client_wait_time: Optional[float] = None self.response_time: Optional[float] = None self.tt_ack: Optional[float] = None self.ttft: Optional[float] = None @@ -59,3 +61,8 @@ def calculate_results(self): self.tpot = ( self.response_time / self.output_tokens ) # Time per output token in ms + + if self.scheduled_start_time is not None and self.start_time is not None: + self.client_wait_time = ( + self.start_time - self.scheduled_start_time + ) diff --git a/user.py b/user.py index 6920d5e1..1b2e9db9 100644 --- a/user.py +++ b/user.py @@ -11,18 +11,19 @@ class User: def __init__( self, user_id, - dataset_q, + request_q, stop_q, results_pipe, plugin, logger_q, log_level, run_duration, + rate_limited, ): """Initialize object.""" self.user_id = user_id self.plugin = plugin - self.dataset_q = dataset_q + self.request_q = request_q self.stop_q = stop_q self.results_list = [] self.results_pipe = results_pipe @@ -31,21 +32,19 @@ def __init__( # Must get reset in user process to use the logger created in _init_user_process_logging self.logger = logging.getLogger("user") self.run_duration = run_duration + self.rate_limited = rate_limited - def make_request(self, test_end_time=0): + def make_request(self, query, test_end_time=0, req_schedule_time=None): """Make a request.""" - try: - query = self.dataset_q.get(timeout=2) - except queue.Empty: - # if timeout passes, queue.Empty will be thrown - # User should continue to poll for inputs - return None - except ValueError: - self.logger.warn("dataset q does not exist!") - return None self.logger.info("User %s making request", self.user_id) result = self.plugin.request_func(query, self.user_id, test_end_time) + + if req_schedule_time: + result.scheduled_start_time = req_schedule_time + + result.calculate_results() + return result def _init_user_process_logging(self): @@ -59,17 +58,47 @@ def _init_user_process_logging(self): self.logger = logging.getLogger("user") return logging.getLogger("user") + def _user_loop(self, test_end_time): + while self.stop_q.empty(): + result = self.make_request(test_end_time) + if result is not None: + self.results_list.append(result) + + def _rate_limited_user_loop(self, test_end_time): + while self.stop_q.empty(): + try: + req_schedule_time, query = self.request_q.get(timeout=5) + if not self.stop_q.empty(): + break + except queue.Empty: + # if timeout passes, queue.Empty will be thrown + # User should check if stop_q has been set, else poll again + # self.debug.info("User waiting for a request to be scheduled") + continue + + result = self.make_request(query, test_end_time, req_schedule_time=req_schedule_time) + + if result is not None: + self.results_list.append(result) + else: + self.logger.info("Unexpected None result from User.make_request()") + + def run_user_process(self): """Run a process.""" self._init_user_process_logging() + # Waits for all processes to actually be started + while not self.rate_limited and self.request_q.empty(): + time.sleep(0.1) + test_end_time = time.time() + self.run_duration - while self.stop_q.empty(): - result = self.make_request(test_end_time) - # make_request will return None after 2 seconds if dataset_q is empty - # to ensure that users don't get stuck waiting for requests indefinitely - if result is not None: - self.results_list.append(result) + self.logger.info("User %s starting request loop", self.user_id) + + if self.rate_limited: + self._rate_limited_user_loop(test_end_time) + else: + self._user_loop(test_end_time) self.results_pipe.send(self.results_list) diff --git a/utils.py b/utils.py index c5e9bd3b..6c2e53b7 100644 --- a/utils.py +++ b/utils.py @@ -73,8 +73,15 @@ def parse_config(config): logging.info("load_options config: %s", config["load_options"]) load_options = config.get("load_options") - concurrency = load_options.get("concurrency") duration = load_options.get("duration") + if load_options.get("type") == "concurrency": + concurrency = load_options.get("concurrency") + rps = None + elif load_options.get("type") == "rps": + concurrency = load_options.get("concurrency") + rps = load_options.get("rps") + else: + logging.error("Unknown load_options type %s", load_options.get("type")) plugin_type = config.get("plugin") if plugin_type == "openai_plugin": @@ -93,7 +100,7 @@ def parse_config(config): logging.error("Unknown plugin type %s", plugin_type) raise ValueError(f"Unknown plugin type {plugin_type}") - return concurrency, duration, plugin + return rps, concurrency, duration, plugin def yaml_load(file): @@ -118,7 +125,7 @@ def write_output(config, results_list): logging.warning("Output path %s does not exist, creating it!", path) path.mkdir(parents=True, exist_ok=True) - concurrency, duration, _ = parse_config(config) + rps, concurrency, duration, _ = parse_config(config) outfile_name = output_options.get("file").format( concurrency=concurrency, duration=duration ) @@ -218,6 +225,16 @@ def write_output(config, results_list): f"Total throughput across all users bounded by the test duration: {throughput} tokens / sec, for duration {duration}" ) + if rps is not None: + rps_scheduled = req_count / duration + rps_completed = req_completed_within_test_duration / duration + rps_completed_extend = req_count / full_duration + print(f"Requests per second scheduled: {rps_scheduled}") + print(f"Requests per second completed during run: {rps_completed}") + print(f"Requests per second completed total (incl. after run): {rps_completed_extend}") + average_client_wait_time = df["client_wait_time"].mean() + print(f"Avg. client wait time per request: {average_client_wait_time}") + output_obj["summary"]["throughput_full_duration"] = throughput_full_duration output_obj["summary"]["full_duration"] = full_duration output_obj["summary"]["throughput"] = throughput