Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#27 Fix obj serialization for saving #29

Merged
merged 9 commits into from
Oct 29, 2024
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Extras available:
- `hf` to install Hugging Face Transformers dependencies
- `vllm` to install vLLM dependencies
- `llamafile` to install Llamafile dependencies
- `baseten` to install Baseten dependencies

## Quick Start

Expand Down
2 changes: 1 addition & 1 deletion flow_judge/eval_data_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"""Output model for evaluation results."""

feedback: str = Field(..., description="Feedback from the evaluation")
score: int = Field(..., description="Numeric score from the evaluation")
score: int | None = Field(..., description="Numeric score from the evaluation")

Check warning on line 20 in flow_judge/eval_data_types.py

View check run for this annotation

Codecov / codecov/patch

flow_judge/eval_data_types.py#L20

Added line #L20 was not covered by tests

@classmethod
def parse(cls, response: str, fail_on_parse_error: bool = False) -> "EvalOutput":
Expand Down
108 changes: 82 additions & 26 deletions flow_judge/flow_judge.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,18 @@
else:
validate_eval_input(eval_inputs, self.metric)

def _save_results(self, eval_inputs: list[EvalInput], eval_outputs: list[EvalOutput]):
def _save_results(

Check warning on line 51 in flow_judge/flow_judge.py

View check run for this annotation

Codecov / codecov/patch

flow_judge/flow_judge.py#L51

Added line #L51 was not covered by tests
self, eval_inputs: list[EvalInput], eval_outputs: list[EvalOutput], append: bool = False
):
"""Save results to disk."""
logger.info(f"Saving results to {self.output_dir}")
logger.info(f"{'Appending' if append else 'Saving'} results to {self.output_dir}")

Check warning on line 55 in flow_judge/flow_judge.py

View check run for this annotation

Codecov / codecov/patch

flow_judge/flow_judge.py#L55

Added line #L55 was not covered by tests
write_results_to_disk(
eval_inputs, eval_outputs, self.model.metadata, self.metric.name, self.output_dir
eval_inputs,
eval_outputs,
self.model.metadata,
self.metric.name,
self.output_dir,
append=append,
)


Expand Down Expand Up @@ -122,8 +129,48 @@
if not isinstance(model, AsyncBaseFlowJudgeModel):
raise ValueError("Invalid model type. Use AsyncBaseFlowJudgeModel or its subclasses.")

def _handle_batch_result(

Check warning on line 132 in flow_judge/flow_judge.py

View check run for this annotation

Codecov / codecov/patch

flow_judge/flow_judge.py#L132

Added line #L132 was not covered by tests
self, batch_result: BatchResult, batch_len: int, fail_on_parse_error: bool
) -> list[EvalOutput]:
"""Handle output parsing for batched results from Baseten.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably for all model instances not just for baseten right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, updated!


Args:
batch_result: The result of the batch from Baseten.
batch_len: The initial batch size derived from the length of Eval Inputs.
fail_on_parse_error: Flag to raise a parse error for the EvalOutput.

Returns:
list[EvalOutput]: A list of eval outputs with score and feedback.

Note:
There might be instances when downstream errors result in missing entries
for the eval outputs. We implement retry strategies where we can, but in
certain instances (such as network failures) errors are inevitable.
To ascertain predictability, we 'fill-in' the errors with empty EvalOutputs.

"""
eval_outputs = [EvalOutput(feedback="BasetenError", score=None)] * batch_len
for output in batch_result.successful_outputs:
index = output.get("index")
eval_outputs[index - 1] = EvalOutput.parse(

Check warning on line 155 in flow_judge/flow_judge.py

View check run for this annotation

Codecov / codecov/patch

flow_judge/flow_judge.py#L152-L155

Added lines #L152 - L155 were not covered by tests
response=output["response"], fail_on_parse_error=fail_on_parse_error
)

# Log all downstream errors
if len(batch_result.errors) > 0:
logger.warning(

Check warning on line 161 in flow_judge/flow_judge.py

View check run for this annotation

Codecov / codecov/patch

flow_judge/flow_judge.py#L160-L161

Added lines #L160 - L161 were not covered by tests
f"Number of Baseten API errors: {len(batch_result.errors)}"
f" of {batch_result.total_requests}."
f" Success rate is {batch_result.success_rate}"
" List of errors: "
)
for error in batch_result.errors:
logger.warning(f"{error.error_type}: {error.error_message}")

Check warning on line 168 in flow_judge/flow_judge.py

View check run for this annotation

Codecov / codecov/patch

flow_judge/flow_judge.py#L167-L168

Added lines #L167 - L168 were not covered by tests

return eval_outputs

Check warning on line 170 in flow_judge/flow_judge.py

View check run for this annotation

Codecov / codecov/patch

flow_judge/flow_judge.py#L170

Added line #L170 was not covered by tests

async def async_evaluate(
self, eval_input: EvalInput, save_results: bool = False
self, eval_input: EvalInput, save_results: bool = False, append: bool = False
) -> EvalOutput | None:
"""Evaluate a single EvalInput object asynchronously."""
try:
Expand All @@ -132,14 +179,16 @@
result = await self.model._async_generate(prompt)
response = result

# If there are Baseten errors we log & return here.
if isinstance(result, FlowJudgeError):
logger.error(f" {result.error_type}: {result.error_message}")
return

eval_output = EvalOutput.parse(response)
if save_results:
await asyncio.to_thread(self._save_results, [eval_input], [eval_output])
logger.info(f"Saving result {'(append)' if append else '(overwrite)'}")
await asyncio.to_thread(

Check warning on line 189 in flow_judge/flow_judge.py

View check run for this annotation

Codecov / codecov/patch

flow_judge/flow_judge.py#L188-L189

Added lines #L188 - L189 were not covered by tests
self._save_results, [eval_input], [eval_output], append=append
)
return eval_output
except Exception as e:
logger.error(f"Asynchronous evaluation failed: {e}")
Expand All @@ -151,36 +200,43 @@
eval_inputs: list[EvalInput],
use_tqdm: bool = True,
save_results: bool = True,
append: bool = False, # Change default to False
fail_on_parse_error: bool = False,
) -> list[EvalOutput]:
"""Batch evaluate a list of EvalInput objects asynchronously."""
self._validate_inputs(eval_inputs)
prompts = [self._format_prompt(eval_input) for eval_input in eval_inputs]
batch_result = await self.model._async_batch_generate(prompts, use_tqdm=use_tqdm)
responses = batch_result

if isinstance(responses, BatchResult):
responses = [result["response"] for result in batch_result.successful_outputs]
if len(batch_result.errors) > 0:
logger.warning(
f"Number of Baseten API errors: {len(batch_result.errors)}"
f" of {batch_result.total_requests}."
f" Success rate is {batch_result.success_rate}"
"List of errors: "
)
for error in batch_result.errors:
logger.warning(f"{error.error_type}: {error.error_message}")

eval_outputs = [
EvalOutput.parse(response, fail_on_parse_error=fail_on_parse_error)
for response in responses
]
if isinstance(batch_result, BatchResult):
eval_outputs = self._handle_batch_result(

Check warning on line 212 in flow_judge/flow_judge.py

View check run for this annotation

Codecov / codecov/patch

flow_judge/flow_judge.py#L211-L212

Added lines #L211 - L212 were not covered by tests
batch_result=batch_result,
batch_len=len(eval_inputs),
fail_on_parse_error=fail_on_parse_error,
)
else:
eval_outputs = [

Check warning on line 218 in flow_judge/flow_judge.py

View check run for this annotation

Codecov / codecov/patch

flow_judge/flow_judge.py#L218

Added line #L218 was not covered by tests
EvalOutput.parse(response, fail_on_parse_error=fail_on_parse_error)
for response in batch_result
]
logger.warning(f"{eval_outputs}")
parse_failures = sum(1 for output in eval_outputs if output.score and output.score == -1)

Check warning on line 223 in flow_judge/flow_judge.py

View check run for this annotation

Codecov / codecov/patch

flow_judge/flow_judge.py#L222-L223

Added lines #L222 - L223 were not covered by tests

parse_failures = sum(1 for output in eval_outputs if output.score == -1)
if save_results:
await asyncio.to_thread(self._save_results, eval_inputs, eval_outputs)
logger.info(f"Saving {len(eval_outputs)} results")
for i, (eval_input, eval_output) in enumerate(

Check warning on line 227 in flow_judge/flow_judge.py

View check run for this annotation

Codecov / codecov/patch

flow_judge/flow_judge.py#L226-L227

Added lines #L226 - L227 were not covered by tests
zip(eval_inputs, eval_outputs, strict=True)
):
await asyncio.to_thread(

Check warning on line 230 in flow_judge/flow_judge.py

View check run for this annotation

Codecov / codecov/patch

flow_judge/flow_judge.py#L230

Added line #L230 was not covered by tests
self._save_results,
[eval_input],
[eval_output],
append=(append or i > 0), # Append for all but the first, unless append is True
)

if parse_failures > 0:
logger.warning(f"Number of parsing failures: {parse_failures} out of {len(responses)}")
logger.warning(

Check warning on line 238 in flow_judge/flow_judge.py

View check run for this annotation

Codecov / codecov/patch

flow_judge/flow_judge.py#L238

Added line #L238 was not covered by tests
f"Number of parsing failures: {parse_failures} out of {len(eval_outputs)}"
)

return eval_outputs
Loading