-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Add RPS based load option #65
base: main
Are you sure you want to change the base?
Changes from all commits
94dd326
439df72
f0da2c6
f025833
0bfeb9e
f375f55
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -14,22 +14,43 @@ | |||||
import utils | ||||||
|
||||||
|
||||||
def run_main_process(concurrency, duration, dataset, dataset_q, stop_q): | ||||||
def run_main_process(rps, duration, dataset, request_q, stop_q): | ||||||
"""Run the main process.""" | ||||||
logging.info("Test from main process") | ||||||
|
||||||
# Initialize the dataset_queue with 4*concurrency requests | ||||||
start_time = time.time() | ||||||
end_time = start_time + duration | ||||||
if rps is not None: | ||||||
main_loop_rps_mode(dataset, request_q, rps, start_time, end_time) | ||||||
else: | ||||||
main_loop_concurrency_mode(dataset, request_q, start_time, end_time) | ||||||
|
||||||
logging.info("Timer ended, stopping processes") | ||||||
|
||||||
# Signal users to stop sending requests | ||||||
stop_q.put(None) | ||||||
|
||||||
# Empty the dataset queue | ||||||
while not request_q.empty(): | ||||||
logging.debug("Removing element from request_q") | ||||||
request_q.get() | ||||||
|
||||||
return | ||||||
Comment on lines
+37
to
+38
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Drop this return?
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It doesn't but I wonder if adding a dedicated try-catch exception block in this function worth it. We currently catch all the cascade exceptions with the generic Not suggesting this should be addressed in this PR but a follow-up PR to cleanup our exception handling might be good. |
||||||
|
||||||
def main_loop_concurrency_mode(dataset, request_q, start_time, end_time): | ||||||
"""Let all users send requests repeatedly until end_time""" | ||||||
logging.info("Test from main process") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we still need this logging statement here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, I'll remove this thanks! |
||||||
|
||||||
# Initialize the request_q with 2*concurrency requests | ||||||
for query in dataset.get_next_n_queries(2 * concurrency): | ||||||
dataset_q.put(query) | ||||||
request_q.put((None, query)) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From a clarity perspective I think it would be better to have this be a dict or object. E.g.
Suggested change
or set a field on the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using a field in the query dict is much more elegant! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I like the idea of making it a field in the query dict. |
||||||
|
||||||
start_time = time.time() | ||||||
current_time = start_time | ||||||
while (current_time - start_time) < duration: | ||||||
# Keep the dataset queue full for duration | ||||||
if dataset_q.qsize() < int(0.5*concurrency + 1): | ||||||
while current_time < end_time: | ||||||
if request_q.qsize() < int(0.5*concurrency + 1): | ||||||
logging.info("Adding %d entries to dataset queue", concurrency) | ||||||
for query in dataset.get_next_n_queries(concurrency): | ||||||
dataset_q.put(query) | ||||||
request_q.put((None, query)) | ||||||
time.sleep(0.1) | ||||||
current_time = time.time() | ||||||
|
||||||
|
@@ -38,13 +59,40 @@ def run_main_process(concurrency, duration, dataset, dataset_q, stop_q): | |||||
# Signal users to stop sending requests | ||||||
stop_q.put(None) | ||||||
|
||||||
# Empty the dataset queue | ||||||
while not dataset_q.empty(): | ||||||
logging.debug("Removing element from dataset_q") | ||||||
dataset_q.get() | ||||||
|
||||||
return | ||||||
|
||||||
def request_schedule_constant_rps(rps, start_time, end_time): | ||||||
"""Returns a list of timestamps for request schedule with constant RPS""" | ||||||
interval = 1 / rps | ||||||
next_req_time = start_time | ||||||
while next_req_time < end_time: | ||||||
yield(next_req_time) | ||||||
next_req_time = next_req_time + interval | ||||||
|
||||||
|
||||||
# This function should support non-constant RPS in the future | ||||||
def main_loop_rps_mode(dataset, request_q, rps, start_time, end_time): | ||||||
"""Dispatch requests with constant RPS, via schedule_q""" | ||||||
req_times = request_schedule_constant_rps(rps, start_time, end_time) | ||||||
|
||||||
current_time = time.time() | ||||||
query = dataset.get_next_n_queries(1)[0] | ||||||
for next_req_time in req_times: | ||||||
while next_req_time > current_time: | ||||||
# Wait or spin until next req needs to be dispatched | ||||||
sleep_time = (next_req_time - current_time) - 0.03 # Sleep until 30ms before next_req_time | ||||||
if sleep_time > 0: | ||||||
time.sleep(sleep_time) | ||||||
# else spin | ||||||
current_time = time.time() | ||||||
|
||||||
logging.info(f"Scheduling request time {next_req_time}") | ||||||
request_q.put((next_req_time, query)) | ||||||
|
||||||
query = dataset.get_next_n_queries(1)[0] | ||||||
|
||||||
if current_time >= end_time: | ||||||
return | ||||||
|
||||||
|
||||||
def gather_results(results_pipes): | ||||||
"""Get the results.""" | ||||||
|
@@ -57,16 +105,16 @@ def gather_results(results_pipes): | |||||
return results_list | ||||||
|
||||||
|
||||||
def exit_gracefully(procs, dataset_q, stop_q, logger_q, log_reader_thread, code): | ||||||
def exit_gracefully(procs, request_q, stop_q, logger_q, log_reader_thread, code): | ||||||
"""Exit gracefully.""" | ||||||
# Signal users to stop sending requests | ||||||
if stop_q.empty(): | ||||||
stop_q.put(None) | ||||||
|
||||||
if dataset_q is not None and not dataset_q.empty(): | ||||||
logging.warning("Removing more elements from dataset_q after gathering results!") | ||||||
while not dataset_q.empty(): | ||||||
dataset_q.get() | ||||||
if request_q is not None and not request_q.empty(): | ||||||
logging.warning("Removing more elements from request_q after gathering results!") | ||||||
while not request_q.empty(): | ||||||
request_q.get() | ||||||
|
||||||
logging.debug("Calling join() on all user processes") | ||||||
for proc in procs: | ||||||
|
@@ -89,48 +137,51 @@ def main(args): | |||||
log_reader_thread = logging_utils.init_logging(args.log_level, logger_q) | ||||||
|
||||||
# Create processes and their Users | ||||||
request_q = mp_ctx.Queue(1) | ||||||
request_q.cancel_join_thread() | ||||||
stop_q = mp_ctx.Queue(1) | ||||||
dataset_q = mp_ctx.Queue() | ||||||
|
||||||
procs = [] | ||||||
results_pipes = [] | ||||||
|
||||||
# Parse config | ||||||
logging.debug("Parsing YAML config file %s", args.config) | ||||||
concurrency, duration, plugin = 0, 0, None | ||||||
rps, concurrency, duration, plugin = None, 0, 0, None | ||||||
try: | ||||||
config = utils.yaml_load(args.config) | ||||||
concurrency, duration, plugin = utils.parse_config(config) | ||||||
rps, concurrency, duration, plugin = utils.parse_config(config) | ||||||
except Exception as e: | ||||||
logging.error("Exiting due to invalid input: %s", repr(e)) | ||||||
exit_gracefully(procs, dataset_q, stop_q, logger_q, log_reader_thread, 1) | ||||||
exit_gracefully(procs, request_q, stop_q, logger_q, log_reader_thread, 1) | ||||||
|
||||||
try: | ||||||
logging.debug("Creating dataset with configuration %s", config["dataset"]) | ||||||
# Get model_name if set for prompt formatting | ||||||
model_name = config.get("plugin_options", {}).get("model_name", "") | ||||||
dataset = Dataset(model_name=model_name, **config["dataset"]) | ||||||
|
||||||
logging.debug("Creating %s Users and corresponding processes", concurrency) | ||||||
logging.info("Creating %s Users and corresponding processes", concurrency) | ||||||
for idx in range(concurrency): | ||||||
send_results, recv_results = mp_ctx.Pipe() | ||||||
results_pipes.append(recv_results) | ||||||
user = User( | ||||||
idx, | ||||||
dataset_q=dataset_q, | ||||||
request_q=request_q, | ||||||
stop_q=stop_q, | ||||||
results_pipe=send_results, | ||||||
plugin=plugin, | ||||||
logger_q=logger_q, | ||||||
log_level=args.log_level, | ||||||
run_duration=duration, | ||||||
rate_limited=(rps is not None) | ||||||
) | ||||||
proc = mp_ctx.Process(target=user.run_user_process) | ||||||
procs.append(proc) | ||||||
logging.info("Starting %s", proc) | ||||||
proc.start() | ||||||
results_pipes.append(recv_results) | ||||||
|
||||||
logging.debug("Running main process") | ||||||
run_main_process(concurrency, duration, dataset, dataset_q, stop_q) | ||||||
run_main_process(rps, duration, dataset, request_q, stop_q) | ||||||
|
||||||
results_list = gather_results(results_pipes) | ||||||
|
||||||
|
@@ -139,13 +190,12 @@ def main(args): | |||||
# Terminate queues immediately on ^C | ||||||
except KeyboardInterrupt: | ||||||
stop_q.cancel_join_thread() | ||||||
dataset_q.cancel_join_thread() | ||||||
exit_gracefully(procs, dataset_q, stop_q, logger_q, log_reader_thread, 130) | ||||||
exit_gracefully(procs, request_q, stop_q, logger_q, log_reader_thread, 130) | ||||||
except Exception: | ||||||
logging.exception("Unexpected exception in main process") | ||||||
exit_gracefully(procs, dataset_q, stop_q, logger_q, log_reader_thread, 1) | ||||||
exit_gracefully(procs, request_q, stop_q, logger_q, log_reader_thread, 1) | ||||||
|
||||||
exit_gracefully(procs, dataset_q, stop_q, logger_q, log_reader_thread, 0) | ||||||
exit_gracefully(procs, request_q, stop_q, logger_q, log_reader_thread, 0) | ||||||
|
||||||
|
||||||
if __name__ == "__main__": | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -79,7 +79,6 @@ def request_grpc(self, query, user_id, test_end_time: float=0): | |
result.output_tokens_before_timeout = result.output_tokens | ||
result.output_text = response | ||
|
||
result.calculate_results() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if its time to depreciate the caikit_client_plugin? |
||
return result | ||
|
||
def streaming_request_grpc(self, query, user_id, test_end_time: float=0): | ||
|
@@ -113,8 +112,6 @@ def streaming_request_grpc(self, query, user_id, test_end_time: float=0): | |
# TODO: Calculate correct output tokens before test timeout duration for streaming requests | ||
result.output_tokens_before_timeout = result.output_tokens | ||
|
||
result.calculate_results() | ||
|
||
return result | ||
|
||
def request_http(self, query, user_id): | ||
|
@@ -138,7 +135,6 @@ def request_http(self, query, user_id): | |
result.output_tokens_before_timeout = result.output_tokens | ||
result.output_text = response | ||
|
||
result.calculate_results() | ||
return result | ||
|
||
def streaming_request_http(self, query, user_id): | ||
|
@@ -171,5 +167,4 @@ def streaming_request_http(self, query, user_id): | |
# TODO: Calculate correct output tokens before test timeout duration for streaming requests | ||
result.output_tokens_before_timeout = result.output_tokens | ||
|
||
result.calculate_results() | ||
return result |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,7 +35,6 @@ def request_http(self, query, user_id, test_end_time: float=0): | |
|
||
result.end_time = time.time() | ||
|
||
result.calculate_results() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When we do a cleanup we probably should remove this file. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, this was originally added with the thought that it could be used in some test cases but we may want to remove it depending on how we decide to handle testing (unit tests, e2e tests, etc...) |
||
|
||
return result | ||
|
||
|
@@ -63,5 +62,4 @@ def streaming_request_http(self, query, user_id, test_end_time: float=0): | |
# TODO: Calculate correct output tokens before test timeout duration for streaming requests | ||
result.output_tokens_before_timeout = result.output_tokens | ||
|
||
result.calculate_results() | ||
return result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason you replaced the word
constant
load type withconcurrency
? imo,constant
sounds more closer to Constant Load which is aContinuous stream of requests
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking that constant is ambiguous, as RPS can also be constant. My other thinking is that we might later add dynamically changing RPS or dynamically changing concurrency so either RPS or concurrency could be constant or dynamic.