-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Add RPS based load option #65
base: main
Are you sure you want to change the base?
Conversation
bf19e71
to
439df72
Compare
@@ -59,17 +63,48 @@ def _init_user_process_logging(self): | |||
self.logger = logging.getLogger("user") | |||
return logging.getLogger("user") | |||
|
|||
def _user_loop(self, test_end_time): | |||
while self.stop_q.empty(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a thought for the future. What if we have the main process SIGTERM (or SIGUSR1) the subprocesses as a stop message and write a custom signal handler to clean up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah totally, that's long overdue. I was looking into it and should work on it sometime.
load_test.py
Outdated
@@ -89,48 +119,50 @@ def main(args): | |||
log_reader_thread = logging_utils.init_logging(args.log_level, logger_q) | |||
|
|||
# Create processes and their Users | |||
schedule_q = mp_ctx.Queue(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
schedule_q = mp_ctx.Queue(1) | |
schedule_q = mp_ctx.Queue(1) | |
schedule_q.cancel_join_thread() |
Toggle cancel_join_thread()
here to avoid the queue blocking on exit.
for query in dataset.get_next_n_queries(2 * concurrency): | ||
dataset_q.put(query) | ||
request_q.put((None, query)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From a clarity perspective I think it would be better to have this be a dict or object. E.g.
request_q.put((None, query)) | |
request_q.put(dict(query=query, req_time=None)) |
or set a field on the query
dict.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a field in the query dict is much more elegant!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I like the idea of making it a field in the query dict.
|
||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drop this return?
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't but I wonder if adding a dedicated try-catch exception block in this function worth it. We currently catch all the cascade exceptions with the generic Exception
class in the main function but it's probably not the cleanest way to handle the exception IMO.
Not suggesting this should be addressed in this PR but a follow-up PR to cleanup our exception handling might be good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor nits and comments but this looks ready to go.
max_sequence_tokens: 2048 | ||
load_options: | ||
type: constant #Future options: loadgen, stair-step | ||
concurrency: 1 | ||
type: rps #Options: concurrency, rps, loadgen, stair-step |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason you replaced the word constant
load type with concurrency
? imo, constant
sounds more closer to Constant Load which is a Continuous stream of requests
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking that constant is ambiguous, as RPS can also be constant. My other thinking is that we might later add dynamically changing RPS or dynamically changing concurrency so either RPS or concurrency could be constant or dynamic.
|
||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't but I wonder if adding a dedicated try-catch exception block in this function worth it. We currently catch all the cascade exceptions with the generic Exception
class in the main function but it's probably not the cleanest way to handle the exception IMO.
Not suggesting this should be addressed in this PR but a follow-up PR to cleanup our exception handling might be good.
|
||
def main_loop_concurrency_mode(dataset, request_q, start_time, end_time): | ||
"""Let all users send requests repeatedly until end_time""" | ||
logging.info("Test from main process") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this logging statement here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I'll remove this thanks!
for query in dataset.get_next_n_queries(2 * concurrency): | ||
dataset_q.put(query) | ||
request_q.put((None, query)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I like the idea of making it a field in the query dict.
@@ -79,7 +79,6 @@ def request_grpc(self, query, user_id, test_end_time: float=0): | |||
result.output_tokens_before_timeout = result.output_tokens | |||
result.output_text = response | |||
|
|||
result.calculate_results() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if its time to depreciate the caikit_client_plugin?
@@ -35,7 +35,6 @@ def request_http(self, query, user_id, test_end_time: float=0): | |||
|
|||
result.end_time = time.time() | |||
|
|||
result.calculate_results() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we do a cleanup we probably should remove this file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this was originally added with the thought that it could be used in some test cases but we may want to remove it depending on how we decide to handle testing (unit tests, e2e tests, etc...)
@@ -59,17 +63,48 @@ def _init_user_process_logging(self): | |||
self.logger = logging.getLogger("user") | |||
return logging.getLogger("user") | |||
|
|||
def _user_loop(self, test_end_time): | |||
while self.stop_q.empty(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah totally, that's long overdue. I was looking into it and should work on it sometime.
except queue.Empty: | ||
# if timeout passes, queue.Empty will be thrown | ||
# User should check if stop_q has been set, else poll again | ||
# self.debug.info("User waiting for a request to be scheduled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this line be uncommented?
No description provided.