diff --git a/Containerfile b/Containerfile new file mode 100644 index 00000000..63e8f05d --- /dev/null +++ b/Containerfile @@ -0,0 +1,18 @@ +FROM registry.fedoraproject.org/fedora:40 + +ADD . /llm-load-test/ + +WORKDIR /llm-load-test/ + +ARG GIT_BRANCH=main + +ENV LLM_LOAD_TEST_CONFIG=config.yaml +ENV LLM_LOAD_TEST_LOG_LEVEL=debug + +RUN dnf -y update \ + && dnf -y install git python3-pip + +RUN git switch $GIT_BRANCH +RUN pip3 install -r requirements.txt + +CMD python3 load_test.py -c $LLM_LOAD_TEST_CONFIG -log $LLM_LOAD_TEST_LOG_LEVEL diff --git a/load_test.py b/load_test.py index 1b38779a..b49d818b 100644 --- a/load_test.py +++ b/load_test.py @@ -153,6 +153,7 @@ def main(args): # Parse config logging.debug("Parsing YAML config file %s", args.config) concurrency, duration, plugin = 0, 0, None + config = {} try: config = utils.yaml_load(args.config) concurrency, duration, plugin = utils.parse_config(config) @@ -212,7 +213,7 @@ def main(args): results_list = gather_results(results_pipes) - utils.write_output(config, results_list) + plugin.write_output(config, results_list) except Exception: logging.exception("Unexpected exception in main process") diff --git a/plugins/caikit_client_plugin.py b/plugins/caikit_client_plugin.py index c5cafe3e..28b00ff2 100644 --- a/plugins/caikit_client_plugin.py +++ b/plugins/caikit_client_plugin.py @@ -6,7 +6,7 @@ from caikit_nlp_client import GrpcClient, HttpClient from plugins import plugin -from result import RequestResult +from result import TextGenRequestResult urllib3.disable_warnings() @@ -61,7 +61,7 @@ def _parse_args(self, args): def request_grpc(self, query, user_id, test_end_time: float=0): grpc_client = GrpcClient(self.host, self.port, verify=False) - result = RequestResult(user_id, query.get("input_id"), query.get("input_tokens")) + result = TextGenRequestResult(user_id, query.get("input_id"), query.get("input_tokens")) result.start_time = time.time() response = grpc_client.generate_text( @@ -85,7 +85,7 @@ def request_grpc(self, query, user_id, test_end_time: float=0): def streaming_request_grpc(self, query, user_id, test_end_time: float=0): grpc_client = GrpcClient(self.host, self.port, verify=False) - result = RequestResult(user_id, query.get("input_id"), query.get("input_tokens")) + result = TextGenRequestResult(user_id, query.get("input_id"), query.get("input_tokens")) tokens = [] result.start_time = time.time() @@ -120,7 +120,7 @@ def streaming_request_grpc(self, query, user_id, test_end_time: float=0): def request_http(self, query, user_id): http_client = HttpClient(self.url, verify=False) - result = RequestResult(user_id, query.get("input_id"), query.get("input_tokens")) + result = TextGenRequestResult(user_id, query.get("input_id"), query.get("input_tokens")) result.start_time = time.time() @@ -144,7 +144,7 @@ def request_http(self, query, user_id): def streaming_request_http(self, query, user_id): http_client = HttpClient(self.url, verify=False) - result = RequestResult(user_id, query.get("input_id"), query.get("input_tokens")) + result = TextGenRequestResult(user_id, query.get("input_id"), query.get("input_tokens")) tokens = [] result.start_time = time.time() diff --git a/plugins/caikit_embedding_plugin.py b/plugins/caikit_embedding_plugin.py new file mode 100644 index 00000000..ecc50354 --- /dev/null +++ b/plugins/caikit_embedding_plugin.py @@ -0,0 +1,275 @@ +import json +import logging +import time + +import urllib3 +from caikit_nlp_client import GrpcClient, HttpClient +from pathlib import Path + +import pandas as pd + +import utils + +from plugins import plugin +from result import EmbeddingRequestResult + +urllib3.disable_warnings() + + +""" +Example plugin config.yaml: + +plugin: "caikit_embedding_plugin" +plugin_options: + interface: "http" # Some plugins like caikit-nlp-client should support grpc/http + streaming: True + model_name: "Llama-2-7b-hf" + host: "https://llama-2-7b-hf-isvc-predictor-dagray-test.apps.modelserving.nvidia.eng.rdu2.redhat.com" + port: 443 +""" + +logger = logging.getLogger("user") + +required_args = ["model_name", "host", "port", "interface", "model_max_input_tokens", "task"] + +class CaikitEmbeddingPlugin(plugin.Plugin): + def __init__(self, args): + self._parse_args(args) + + def _parse_args(self, args): + for arg in required_args: + if arg not in args: + logger.error("Missing plugin arg: %s", arg) + + self.model_name = args["model_name"] + self.host = args["host"] + self.port = args["port"] + self.model_max_input_tokens = args["model_max_input_tokens"] + self.save_raw_output = True if "save_raw_output" not in args else args["save_raw_output"] + self.only_summary = False if "only_summary" not in args else args["only_summary"] + self.batch_size = 1 if "batch_size" not in args else args["batch_size"] + self.timeout = 120 if "timeout" not in args else args["timeout"] + + if args["task"] == "embedding": + if args["interface"] == "http": + self.url = f"{self.host}:{self.port}" + self.request_func = self.request_http_embedding + else: + logger.error("Interface %s not yet implemented for %s", args["interface"], args["task"]) + elif args["task"] == "sentence_similarity": + if args["interface"] == "http": + self.url = f"{self.host}:{self.port}" + self.request_func = self.request_http_sentence_similarity + else: + logger.error("Interface %s not yet implemented for %s", args["interface"], args["task"]) + elif args["task"] == "rerank": + if args["interface"] == "http": + self.url = f"{self.host}:{self.port}" + self.request_func = self.request_http_rerank + else: + logger.error("Interface %s not yet implemented for %s", args["interface"], args["task"]) + else: + logger.error("Task %s not yet implemented", args["task"]) + + if args["task"] != "embedding": + if "objects_per_request" in args: + self.objects_per_request = args["objects_per_request"] + else: + self.objects_per_request = 10 + else: + self.objects_per_request = self.batch_size + + def request_grpc_embedding(self, query, user_id, test_end_time: float=0): + """ + Not yet implemented as gRPC functionality is not yet implemented + in caikit-nlp-client for the embeddings endpoints + """ + return {} + + def request_http_embedding(self, query, user_id, test_end_time: float=0): + http_client = HttpClient(self.host, verify=False) + + result = EmbeddingRequestResult(user_id, query.get("input_id"), query.get("input_tokens")) + + result.start_time = time.time() + + response = http_client.embedding_tasks( + self.model_name, + [query["text"] for _ in range(self.batch_size)], + parameters={ + "truncate_input_tokens": self.model_max_input_tokens + }, + timeout=self.timeout + ) + + logger.debug("Response: %s", json.dumps(response)) + result.end_time = time.time() + + result.input_tokens = response["input_token_count"] + result.input_objects = 1 + if self.save_raw_output: + result.output_object = str([result["data"]["values"] for result in response["results"]]) + + result.calculate_results() + return result + + def request_http_sentence_similarity(self, query, user_id, test_end_time: float=0): + http_client = HttpClient(self.host, verify=False) + + result = EmbeddingRequestResult(user_id, query.get("input_id"), query.get("input_tokens")) + result.start_time = time.time() + + num_objects = 10 + + response = http_client.sentence_similarity_tasks( + self.model_name, + [query["text"] for _ in range(self.batch_size)], + list(query["text"] for _ in range(num_objects)), + parameters={ + "truncate_input_tokens": self.model_max_input_tokens + }, + timeout=self.timeout + ) + + logger.debug("Response: %s", json.dumps(response)) + result.end_time = time.time() + + result.input_tokens = response["input_token_count"] + result.input_objects = num_objects + if self.save_raw_output: + result.output_object = str(response) + + result.calculate_results() + return result + + def request_http_rerank(self, query, user_id, test_end_time: float=0): + http_client = HttpClient(self.host, verify=False) + + result = EmbeddingRequestResult(user_id, query.get("input_id"), query.get("input_tokens")) + result.start_time = time.time() + + num_objects = 10 + + response = http_client.rerank_tasks( + self.model_name, + [{query["text"]: i} for i in range(num_objects)], + [query["text"] for _ in range(self.batch_size)], + parameters={ + "truncate_input_tokens": self.model_max_input_tokens + }, + timeout=self.timeout + ) + + logger.debug("Response: %s", json.dumps(response)) + result.end_time = time.time() + result.input_tokens = response["input_token_count"] + result.input_objects = num_objects + if self.save_raw_output: + result.output_object = str(response) + + result.input_queries = self.batch_size + result.calculate_results() + return result + + def write_output(self, config, results_list): + """Write output for embedding results""" + output_options = config.get("output") + output_path = output_options.get("dir") + + logging.info("Writing output to %s", output_path) + path = Path(output_path) + if not (path.exists() and path.is_dir()): + logging.warning("Output path %s does not exist, creating it!", path) + path.mkdir(parents=True, exist_ok=True) + + concurrency, duration, _ = utils.parse_config(config) + outfile_name = output_options.get("file").format( + concurrency=concurrency, duration=duration + ) + outfile = path / Path(outfile_name) + results_list = [result.asdict() for result in results_list] + if self.only_summary: + output_obj = { + "config": config, + "summary": {}, + } + else: + output_obj = { + "results": results_list, + "config": config, + "summary": {}, + } + + logging.info("Length of results: %d", len(results_list)) + + # TODO, should this be output using logging? + df = pd.DataFrame(results_list) + df.head() + + with pd.option_context("display.max_rows", None, "display.max_columns", None): + print(df) + print(f"\n---\nFull results in {outfile}. Results summary:") + + error_count = len(df[~df["error_text"].isnull()]) + req_count = len(df) + print(f"Error count: {error_count} of {req_count} total requests") + + # Ignore errors for summary results + df = df[df["error_text"].isnull()] + + summary_df = df[ + [ + "response_time", + "input_tokens", + "input_objects", + "input_queries", + ] + ].mean(numeric_only=True) + print(summary_df) + + # Only consider requests that were completed within the duration of the test for + # calculating the summary statistics on tpot, ttft, itl, tt_ack + req_completed_within_test_duration = len(df) + + # summaries + output_obj = utils.get_summary(df, output_obj, "response_time") + + output_obj = utils.get_summary(df, output_obj, "input_tokens") + + output_obj = utils.get_summary(df, output_obj, "input_objects") + + output_obj = utils.get_summary(df, output_obj, "input_queries") + + # CALCULATE REAL DURATION NOT TARGET DURATION + true_end = df["end_time"].max() + true_start = df["start_time"].min() + full_duration = true_end - true_start + throughput_full_duration = df["input_tokens"].sum() / full_duration + throughput_per_object = df["input_objects"].sum() / full_duration + throughput_tokens_per_doc_per_sec = (df["input_tokens"].sum() / df["input_objects"].sum()) / full_duration + print( + f"Total true throughput across all users: {throughput_full_duration} tokens / sec, for duration {full_duration}" + ) + + throughput = df["input_tokens"].sum() / duration + print( + f"Total throughput across all users bounded by the test duration: {throughput} tokens / sec, for duration {duration}" + ) + + output_obj["summary"]["throughput_full_duration"] = throughput_full_duration + output_obj["summary"]["throughput_per_object"] = throughput_per_object + output_obj["summary"]["throughput_tokens_per_document_per_second"] = throughput_tokens_per_doc_per_sec + output_obj["summary"]["full_duration"] = full_duration + output_obj["summary"]["throughput"] = throughput + output_obj["summary"]["total_requests"] = req_count + output_obj["summary"]["total_tokens"] = df["input_tokens"].sum() + output_obj["summary"][ + "req_completed_within_test_duration" + ] = req_completed_within_test_duration + output_obj["summary"]["total_failures"] = error_count + output_obj["summary"]["failure_rate"] = error_count / req_count * 100 + output_obj["summary"]["start_time"] = true_start + + json_out = json.dumps(output_obj, cls=utils.customEncoder, indent=2) + with outfile.open("w") as f: + f.write(json_out) diff --git a/plugins/dummy_plugin.py b/plugins/dummy_plugin.py index eedf42c1..07a87a7f 100644 --- a/plugins/dummy_plugin.py +++ b/plugins/dummy_plugin.py @@ -1,7 +1,7 @@ import time from plugins import plugin -from result import RequestResult +from result import TextGenRequestResult """ Example plugin config.yaml: @@ -23,7 +23,7 @@ def _parse_args(self, args): self.request_func = self.request_http def request_http(self, query, user_id, test_end_time: float=0): - result = RequestResult(user_id, query.get("input_id"), query.get("input_tokens")) + result = TextGenRequestResult(user_id, query.get("input_id"), query.get("input_tokens")) result.start_time = time.time() # Fake response is just the input backwards @@ -40,7 +40,7 @@ def request_http(self, query, user_id, test_end_time: float=0): return result def streaming_request_http(self, query, user_id, test_end_time: float=0): - result = RequestResult(user_id, query.get("input_id"), query.get("input_tokens")) + result = TextGenRequestResult(user_id, query.get("input_id"), query.get("input_tokens")) result.start_time = time.time() time.sleep(0.1) diff --git a/plugins/hf_tgi_plugin.py b/plugins/hf_tgi_plugin.py index 4bb9edf2..eeb581b8 100644 --- a/plugins/hf_tgi_plugin.py +++ b/plugins/hf_tgi_plugin.py @@ -6,7 +6,7 @@ import urllib3 from plugins import plugin -from result import RequestResult +from result import TextGenRequestResult urllib3.disable_warnings() @@ -48,9 +48,9 @@ def streaming_request_http(self, query, user_id, test_end_time: float=0): }, } - result = RequestResult(user_id, query.get("input_id"), query.get("input_tokens")) + result = TextGenRequestResult(user_id, query.get("input_id"), query.get("input_tokens")) - tokens = [] + tokens = [] response = None result.start_time = time.time() try: diff --git a/plugins/openai_plugin.py b/plugins/openai_plugin.py index 573651b3..ac56aad1 100644 --- a/plugins/openai_plugin.py +++ b/plugins/openai_plugin.py @@ -6,7 +6,7 @@ import urllib3 from plugins import plugin -from result import RequestResult +from result import TextGenRequestResult urllib3.disable_warnings() """ @@ -46,7 +46,7 @@ def _parse_args(self, args): def request_http(self, query: dict, user_id: int, test_end_time: float = 0): - result = RequestResult(user_id, query.get("text"), query.get("input_tokens")) + result = TextGenRequestResult(user_id, query.get("text"), query.get("input_tokens")) result.start_time = time.time() @@ -145,7 +145,7 @@ def streaming_request_http(self, query: dict, user_id: int, test_end_time: float if self.model_name is not None: data["model"] = self.model_name - result = RequestResult(user_id, query.get("input_id"), query.get("input_tokens")) + result = TextGenRequestResult(user_id, query.get("input_id"), query.get("input_tokens")) tokens = [] response = None diff --git a/plugins/plugin.py b/plugins/plugin.py index 15ce314a..ca7aab03 100644 --- a/plugins/plugin.py +++ b/plugins/plugin.py @@ -1,3 +1,11 @@ +import json +import logging +from pathlib import Path + +import pandas as pd + +import utils + class Plugin: def __init__(self, args): self.args = args @@ -13,3 +21,129 @@ def request_grpc(self, query, user_id): def streaming_request_grpc(self, query, user_id): pass + + def write_output(self, config, results_list): + """Write the results.""" + output_options = config.get("output") + output_path = output_options.get("dir") + + logging.info("Writing output to %s", output_path) + path = Path(output_path) + if not (path.exists() and path.is_dir()): + logging.warning("Output path %s does not exist, creating it!", path) + path.mkdir(parents=True, exist_ok=True) + + concurrency, duration, _ = utils.parse_config(config) + outfile_name = output_options.get("file").format( + concurrency=concurrency, duration=duration + ) + outfile = path / Path(outfile_name) + results_list = [result.asdict() for result in results_list] + output_obj = { + "results": results_list, + "config": config, + "summary": {}, + } + + logging.info("Length of results: %d", len(results_list)) + + # TODO, should this be output using logging? + df = pd.DataFrame(results_list) + df.head() + + with pd.option_context("display.max_rows", None, "display.max_columns", None): + print(df) + print(f"\n---\nFull results in {outfile}. Results summary:") + + error_count = len(df[~df["error_text"].isnull()]) + req_count = len(df) + print(f"Error count: {error_count} of {req_count} total requests") + + # Ignore errors for summary results + df = df[df["error_text"].isnull()] + + if "ttft" in df: + # Streaming + summary_df = df[ + [ + "tt_ack", + "ttft", + "itl", + "tpot", + "response_time", + "output_tokens", + "output_tokens_before_timeout", + "input_tokens", + ] + ].mean(numeric_only=True) + else: + # Non-streaming, no TTFT or ITL + summary_df = df[ + [ + "tpot", + "response_time", + "output_tokens", + "output_tokens_before_timeout", + "input_tokens", + ] + ].mean(numeric_only=True) + print(summary_df) + + # Only consider requests that were completed within the duration of the test for + # calculating the summary statistics on tpot, ttft, itl, tt_ack + df_test_duration = df[df["output_tokens"] == df["output_tokens_before_timeout"]] + req_completed_within_test_duration = len(df_test_duration) + + # Time per output token summary + output_obj = utils.get_summary(df_test_duration, output_obj, "tpot") + + if "ttft" in df: + # Time to first token summary + output_obj = utils.get_summary(df_test_duration, output_obj, "ttft") + + # Inter-token latency summary + output_obj = utils.get_summary(df_test_duration, output_obj, "itl") + + # Time to ack summary + output_obj = utils.get_summary(df_test_duration, output_obj, "tt_ack") + + # response time summary + output_obj = utils.get_summary(df, output_obj, "response_time") + + # output tokens summary + output_obj = utils.get_summary(df, output_obj, "output_tokens") + + # output tokens summary + output_obj = utils.get_summary(df, output_obj, "output_tokens_before_timeout") + + # input tokens summary + output_obj = utils.get_summary(df, output_obj, "input_tokens") + + # CALCULATE REAL DURATION NOT TARGET DURATION + true_end = df["end_time"].max() + true_start = df["start_time"].min() + full_duration = true_end - true_start + throughput_full_duration = df["output_tokens"].sum() / full_duration + print( + f"Total true throughput across all users: {throughput_full_duration} tokens / sec, for duration {full_duration}" + ) + + throughput = df["output_tokens_before_timeout"].sum() / duration + print( + f"Total throughput across all users bounded by the test duration: {throughput} tokens / sec, for duration {duration}" + ) + + output_obj["summary"]["throughput_full_duration"] = throughput_full_duration + output_obj["summary"]["full_duration"] = full_duration + output_obj["summary"]["throughput"] = throughput + output_obj["summary"]["total_requests"] = req_count + output_obj["summary"][ + "req_completed_within_test_duration" + ] = req_completed_within_test_duration + output_obj["summary"]["total_failures"] = error_count + output_obj["summary"]["failure_rate"] = error_count / req_count * 100 + + json_out = json.dumps(output_obj, cls=utils.customEncoder, indent=2) + with outfile.open("w") as f: + f.write(json_out) + diff --git a/plugins/tgis_grpc_plugin.py b/plugins/tgis_grpc_plugin.py index 0469be1b..9cd65b86 100644 --- a/plugins/tgis_grpc_plugin.py +++ b/plugins/tgis_grpc_plugin.py @@ -8,7 +8,7 @@ import generation_pb2_grpc from plugins import plugin -from result import RequestResult +from result import TextGenRequestResult logger = logging.getLogger("user") @@ -78,7 +78,7 @@ def make_request(self, query: dict, user_id: int, test_end_time: float = 0): grpc_channel ) - result = RequestResult( + result = TextGenRequestResult( user_id, query.get("input_id"), query.get("input_tokens") ) request = generation_pb2_grpc.generation__pb2.BatchedGenerationRequest( @@ -133,7 +133,7 @@ def make_request_stream(self, query: dict, user_id: int, test_end_time: float): generation_service_stub = generation_pb2_grpc.GenerationServiceStub( grpc_channel ) - result = RequestResult( + result = TextGenRequestResult( user_id, query.get("input_id"), query.get("input_tokens") ) tokens = [] diff --git a/requirements.txt b/requirements.txt index 4112b0a3..357918c4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -caikit-nlp-client==0.0.8 +git+https://github.com/drewrip/caikit-nlp-client.git@main deprecation flake8 isort diff --git a/result.py b/result.py index b320bcfb..78baf5bc 100644 --- a/result.py +++ b/result.py @@ -1,8 +1,21 @@ """Main result class.""" - class RequestResult: - """Request result class.""" + + def __init__(self, user_id, input_id): + self.user_id = user_id + self.input_id = input_id + + def asdict(self): + """Return a dictionary.""" + return vars(self) + + def calculate_results(self): + """Calculate the results.""" + pass + +class TextGenRequestResult(RequestResult): + """Request result class for text generation tasks.""" def __init__(self, user_id, input_id, input_tokens): """Init method.""" @@ -25,12 +38,6 @@ def __init__(self, user_id, input_id, input_tokens): self.error_code = None self.error_text = None - def asdict(self): - """Return a dictionary.""" - # Maybe later we will want to only include some fields in the results, - # but for now, this just puts all object fields in a dict. - return vars(self) - # Fill in calculated fields like response_time, tt_ack, ttft, tpot. def calculate_results(self): """Calculate the results.""" @@ -53,3 +60,33 @@ def calculate_results(self): self.tpot = ( self.response_time / self.output_tokens ) # Time per output token in ms + +class EmbeddingRequestResult(RequestResult): + """Request result class for embedding tasks.""" + + def __init__(self, user_id, input_id, input_tokens): + """Init method.""" + self.user_id = user_id + self.input_id = input_id + self.input_tokens = input_tokens + self.input_objects: int = 1 + self.input_queries: int = 1 + self.output_object: str = "" + self.start_time = None + self.ack_time = None + self.end_time = None + self.response_time = None + self.stop_reason = None + self.error_code = None + self.error_text = None + + # Fill in calculated fields like response_time, tt_ack, ttft, tpot. + def calculate_results(self): + """Calculate the results.""" + # Only calculate results if response is error-free. + if self.error_code is None and self.error_text is None: + # response_time in seconds + self.response_time = 1000 * (self.end_time - self.start_time) + + if self.ack_time is not None: + self.tt_ack = 1000 * (self.ack_time - self.start_time) diff --git a/utils.py b/utils.py index c5e9bd3b..1bc4eb8e 100644 --- a/utils.py +++ b/utils.py @@ -7,11 +7,11 @@ from pathlib import Path import numpy as np - import pandas as pd from plugins import ( caikit_client_plugin, + caikit_embedding_plugin, dummy_plugin, hf_tgi_plugin, openai_plugin, @@ -83,6 +83,8 @@ def parse_config(config): ) elif plugin_type == "caikit_client_plugin": plugin = caikit_client_plugin.CaikitClientPlugin(config.get("plugin_options")) + elif plugin_type == "caikit_embedding_plugin": + plugin = caikit_embedding_plugin.CaikitEmbeddingPlugin(config.get("plugin_options")) elif plugin_type == "tgis_grpc_plugin": plugin = tgis_grpc_plugin.TGISGRPCPlugin(config.get("plugin_options")) elif plugin_type == "hf_tgi_plugin": @@ -106,133 +108,6 @@ def yaml_load(file): except yaml.YAMLError as exc: raise RuntimeError(f"Could not parse {file}") from exc - -def write_output(config, results_list): - """Write the results.""" - output_options = config.get("output") - output_path = output_options.get("dir") - - logging.info("Writing output to %s", output_path) - path = Path(output_path) - if not (path.exists() and path.is_dir()): - logging.warning("Output path %s does not exist, creating it!", path) - path.mkdir(parents=True, exist_ok=True) - - concurrency, duration, _ = parse_config(config) - outfile_name = output_options.get("file").format( - concurrency=concurrency, duration=duration - ) - outfile = path / Path(outfile_name) - results_list = [result.asdict() for result in results_list] - output_obj = { - "results": results_list, - "config": config, - "summary": {}, - } - - logging.info("Length of results: %d", len(results_list)) - - # TODO, should this be output using logging? - df = pd.DataFrame(results_list) - df.head() - - with pd.option_context("display.max_rows", None, "display.max_columns", None): - print(df) - print(f"\n---\nFull results in {outfile}. Results summary:") - - error_count = len(df[~df["error_text"].isnull()]) - req_count = len(df) - print(f"Error count: {error_count} of {req_count} total requests") - - # Ignore errors for summary results - df = df[df["error_text"].isnull()] - - if "ttft" in df: - # Streaming - summary_df = df[ - [ - "tt_ack", - "ttft", - "itl", - "tpot", - "response_time", - "output_tokens", - "output_tokens_before_timeout", - "input_tokens", - ] - ].mean(numeric_only=True) - else: - # Non-streaming, no TTFT or ITL - summary_df = df[ - [ - "tpot", - "response_time", - "output_tokens", - "output_tokens_before_timeout", - "input_tokens", - ] - ].mean(numeric_only=True) - print(summary_df) - - # Only consider requests that were completed within the duration of the test for - # calculating the summary statistics on tpot, ttft, itl, tt_ack - df_test_duration = df[df["output_tokens"] == df["output_tokens_before_timeout"]] - req_completed_within_test_duration = len(df_test_duration) - - # Time per output token summary - output_obj = get_summary(df_test_duration, output_obj, "tpot") - - if "ttft" in df: - # Time to first token summary - output_obj = get_summary(df_test_duration, output_obj, "ttft") - - # Inter-token latency summary - output_obj = get_summary(df_test_duration, output_obj, "itl") - - # Time to ack summary - output_obj = get_summary(df_test_duration, output_obj, "tt_ack") - - # response time summary - output_obj = get_summary(df, output_obj, "response_time") - - # output tokens summary - output_obj = get_summary(df, output_obj, "output_tokens") - - # output tokens summary - output_obj = get_summary(df, output_obj, "output_tokens_before_timeout") - - # input tokens summary - output_obj = get_summary(df, output_obj, "input_tokens") - - # CALCULATE REAL DURATION NOT TARGET DURATION - true_end = df["end_time"].max() - true_start = df["start_time"].min() - full_duration = true_end - true_start - throughput_full_duration = df["output_tokens"].sum() / full_duration - print( - f"Total true throughput across all users: {throughput_full_duration} tokens / sec, for duration {full_duration}" - ) - - throughput = df["output_tokens_before_timeout"].sum() / duration - print( - f"Total throughput across all users bounded by the test duration: {throughput} tokens / sec, for duration {duration}" - ) - - output_obj["summary"]["throughput_full_duration"] = throughput_full_duration - output_obj["summary"]["full_duration"] = full_duration - output_obj["summary"]["throughput"] = throughput - output_obj["summary"]["total_requests"] = req_count - output_obj["summary"][ - "req_completed_within_test_duration" - ] = req_completed_within_test_duration - output_obj["summary"]["total_failures"] = error_count - output_obj["summary"]["failure_rate"] = error_count / req_count * 100 - - json_out = json.dumps(output_obj, cls=customEncoder, indent=2) - with outfile.open("w") as f: - f.write(json_out) - - def get_summary(df: pd.DataFrame, output_obj: dict, summary_key: str): """Get the summary.""" output_obj["summary"][summary_key] = {}