From f14337956f1a895a9f105a65e4bf67687475836a Mon Sep 17 00:00:00 2001 From: Minaam Shahid <34722968+minaamshahid@users.noreply.github.com> Date: Tue, 29 Oct 2024 10:43:54 +0500 Subject: [PATCH] #27 Fix obj serialization for saving (#29) * fix: serialize obj for saving * fix: add log warning back to steam * fix: quality improvements to result_writer & change to readme for baseten extras & pyproject build ignore img dir * fix: rm non-allowed exclude key from pyproject.toml * feat: vllm pinned & test result writing unit + e2e * fix: equate eval outputs and inputs * fix: handle_batch_results docstring * fix: failing baseten test --------- Co-authored-by: sariola --- README.md | 1 + flow_judge/eval_data_types.py | 2 +- flow_judge/flow_judge.py | 108 +++-- flow_judge/integrations/llama_index.py | 228 ++++++++++- flow_judge/models/adapters/baseten/todos.md | 43 -- flow_judge/models/vllm.py | 5 + flow_judge/utils/result_writer.py | 254 ++++++++++-- pyproject.toml | 2 +- .../integrations/test_llama_index_e2e.py | 185 +++++---- tests/unit/models/test_baseten.py | 8 + tests/unit/utils/test_result_writer.py | 368 ++++++++++++++++++ 11 files changed, 1011 insertions(+), 193 deletions(-) delete mode 100644 flow_judge/models/adapters/baseten/todos.md create mode 100644 tests/unit/utils/test_result_writer.py diff --git a/README.md b/README.md index 239a5f0..599026d 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,7 @@ Extras available: - `hf` to install Hugging Face Transformers dependencies - `vllm` to install vLLM dependencies - `llamafile` to install Llamafile dependencies +- `baseten` to install Baseten dependencies ## Quick Start diff --git a/flow_judge/eval_data_types.py b/flow_judge/eval_data_types.py index 13347a4..d4d73a3 100644 --- a/flow_judge/eval_data_types.py +++ b/flow_judge/eval_data_types.py @@ -17,7 +17,7 @@ class EvalOutput(BaseModel): """Output model for evaluation results.""" feedback: str = Field(..., description="Feedback from the evaluation") - score: int = Field(..., description="Numeric score from the evaluation") + score: int | None = Field(..., description="Numeric score from the evaluation") @classmethod def parse(cls, response: str, fail_on_parse_error: bool = False) -> "EvalOutput": diff --git a/flow_judge/flow_judge.py b/flow_judge/flow_judge.py index 82a1048..c21355b 100644 --- a/flow_judge/flow_judge.py +++ b/flow_judge/flow_judge.py @@ -48,11 +48,18 @@ def _validate_inputs(self, eval_inputs: EvalInput | list[EvalInput]): else: validate_eval_input(eval_inputs, self.metric) - def _save_results(self, eval_inputs: list[EvalInput], eval_outputs: list[EvalOutput]): + def _save_results( + self, eval_inputs: list[EvalInput], eval_outputs: list[EvalOutput], append: bool = False + ): """Save results to disk.""" - logger.info(f"Saving results to {self.output_dir}") + logger.info(f"{'Appending' if append else 'Saving'} results to {self.output_dir}") write_results_to_disk( - eval_inputs, eval_outputs, self.model.metadata, self.metric.name, self.output_dir + eval_inputs, + eval_outputs, + self.model.metadata, + self.metric.name, + self.output_dir, + append=append, ) @@ -122,8 +129,48 @@ def __init__( if not isinstance(model, AsyncBaseFlowJudgeModel): raise ValueError("Invalid model type. Use AsyncBaseFlowJudgeModel or its subclasses.") + def _handle_batch_result( + self, batch_result: BatchResult, batch_len: int, fail_on_parse_error: bool + ) -> list[EvalOutput]: + """Handle output parsing for batched results. + + Args: + batch_result: The result of the batch from Baseten. + batch_len: The initial batch size derived from the length of Eval Inputs. + fail_on_parse_error: Flag to raise a parse error for the EvalOutput. + + Returns: + list[EvalOutput]: A list of eval outputs with score and feedback. + + Note: + There might be instances when downstream errors result in missing entries + for the eval outputs. We implement retry strategies where we can, but in + certain instances (such as network failures) errors are inevitable. + To ascertain predictability, we 'fill-in' the errors with empty EvalOutputs. + + """ + eval_outputs = [EvalOutput(feedback="BasetenError", score=None)] * batch_len + for output in batch_result.successful_outputs: + index = output.get("index") + eval_outputs[index - 1] = EvalOutput.parse( + response=output["response"], fail_on_parse_error=fail_on_parse_error + ) + + # Log all downstream errors + if len(batch_result.errors) > 0: + logger.warning( + f"Number of Baseten API errors: {len(batch_result.errors)}" + f" of {batch_result.total_requests}." + f" Success rate is {batch_result.success_rate}" + " List of errors: " + ) + for error in batch_result.errors: + logger.warning(f"{error.error_type}: {error.error_message}") + + return eval_outputs + async def async_evaluate( - self, eval_input: EvalInput, save_results: bool = False + self, eval_input: EvalInput, save_results: bool = False, append: bool = False ) -> EvalOutput | None: """Evaluate a single EvalInput object asynchronously.""" try: @@ -132,14 +179,16 @@ async def async_evaluate( result = await self.model._async_generate(prompt) response = result - # If there are Baseten errors we log & return here. if isinstance(result, FlowJudgeError): logger.error(f" {result.error_type}: {result.error_message}") return eval_output = EvalOutput.parse(response) if save_results: - await asyncio.to_thread(self._save_results, [eval_input], [eval_output]) + logger.info(f"Saving result {'(append)' if append else '(overwrite)'}") + await asyncio.to_thread( + self._save_results, [eval_input], [eval_output], append=append + ) return eval_output except Exception as e: logger.error(f"Asynchronous evaluation failed: {e}") @@ -151,36 +200,43 @@ async def async_batch_evaluate( eval_inputs: list[EvalInput], use_tqdm: bool = True, save_results: bool = True, + append: bool = False, # Change default to False fail_on_parse_error: bool = False, ) -> list[EvalOutput]: """Batch evaluate a list of EvalInput objects asynchronously.""" self._validate_inputs(eval_inputs) prompts = [self._format_prompt(eval_input) for eval_input in eval_inputs] batch_result = await self.model._async_batch_generate(prompts, use_tqdm=use_tqdm) - responses = batch_result - - if isinstance(responses, BatchResult): - responses = [result["response"] for result in batch_result.successful_outputs] - if len(batch_result.errors) > 0: - logger.warning( - f"Number of Baseten API errors: {len(batch_result.errors)}" - f" of {batch_result.total_requests}." - f" Success rate is {batch_result.success_rate}" - "List of errors: " - ) - for error in batch_result.errors: - logger.warning(f"{error.error_type}: {error.error_message}") - eval_outputs = [ - EvalOutput.parse(response, fail_on_parse_error=fail_on_parse_error) - for response in responses - ] + if isinstance(batch_result, BatchResult): + eval_outputs = self._handle_batch_result( + batch_result=batch_result, + batch_len=len(eval_inputs), + fail_on_parse_error=fail_on_parse_error, + ) + else: + eval_outputs = [ + EvalOutput.parse(response, fail_on_parse_error=fail_on_parse_error) + for response in batch_result + ] + logger.warning(f"{eval_outputs}") + parse_failures = sum(1 for output in eval_outputs if output.score and output.score == -1) - parse_failures = sum(1 for output in eval_outputs if output.score == -1) if save_results: - await asyncio.to_thread(self._save_results, eval_inputs, eval_outputs) + logger.info(f"Saving {len(eval_outputs)} results") + for i, (eval_input, eval_output) in enumerate( + zip(eval_inputs, eval_outputs, strict=True) + ): + await asyncio.to_thread( + self._save_results, + [eval_input], + [eval_output], + append=(append or i > 0), # Append for all but the first, unless append is True + ) if parse_failures > 0: - logger.warning(f"Number of parsing failures: {parse_failures} out of {len(responses)}") + logger.warning( + f"Number of parsing failures: {parse_failures} out of {len(eval_outputs)}" + ) return eval_outputs diff --git a/flow_judge/integrations/llama_index.py b/flow_judge/integrations/llama_index.py index 9e76b05..a7ddff3 100644 --- a/flow_judge/integrations/llama_index.py +++ b/flow_judge/integrations/llama_index.py @@ -14,9 +14,21 @@ class LlamaIndexFlowJudge(BaseEvaluator): - """LlamaIndexFlowJudge is a custom evaluator for LlamaIndex. + """A custom evaluator for LlamaIndex that uses FlowJudge to evaluate RAG system performance. - It uses FlowJudge to evaluate the performance of a rag system. + This class integrates FlowJudge with LlamaIndex's evaluation framework, allowing for + seamless evaluation of retrieval-augmented generation (RAG) systems using custom metrics + and models. + + Attributes: + metric (Metric | CustomMetric): The evaluation metric to be used. + model (AsyncBaseFlowJudgeModel): The model used for evaluation. + output_dir (str): Directory to save evaluation results. + save_results (bool): Whether to save evaluation results to disk. + judge (AsyncFlowJudge): The FlowJudge instance used for evaluation. + + Raises: + ValueError: If invalid metric or model types are provided. """ def __init__( @@ -24,18 +36,29 @@ def __init__( metric: Metric | CustomMetric, model: AsyncBaseFlowJudgeModel, output_dir: str = "output/", - ): - """Initialize the LlamaIndexFlowJudge.""" - if isinstance(metric, (Metric, CustomMetric)): - self.metric = metric - else: + save_results: bool = False, + ) -> None: + """Initialize the LlamaIndexFlowJudge. + + Args: + metric: The evaluation metric to be used. + model: The model used for evaluation. + output_dir: Directory to save evaluation results. Defaults to "output/". + save_results: Whether to save evaluation results to disk. Defaults to False. + + Raises: + ValueError: If invalid metric or model types are provided. + """ + if not isinstance(metric, (Metric, CustomMetric)): raise ValueError("Invalid metric type. Use Metric or CustomMetric.") + self.metric = metric if not isinstance(model, AsyncBaseFlowJudgeModel): raise ValueError("Invalid model type. Use AsyncBaseFlowJudgeModel or its subclasses.") - self.model = model + self.output_dir = output_dir + self.save_results = save_results self.judge = AsyncFlowJudge( metric=self.metric, model=self.model, output_dir=self.output_dir @@ -57,16 +80,41 @@ async def aevaluate( contexts: Sequence[str] | None = None, reference: str | None = None, sleep_time_in_seconds: int = 0, + save_results: bool | None = None, **kwargs: Any, ) -> EvaluationResult: - """Evaluate the performance of a model asynchronously.""" - del kwargs # Unused + """Evaluate the performance of a model asynchronously. + + This method evaluates a single query-response pair, optionally considering contexts + and a reference answer. + + Args: + query: The input query to evaluate. + response: The model's response to evaluate. + contexts: Relevant context information for the query. + reference: The reference answer for comparison. + sleep_time_in_seconds: Time to sleep before evaluation (for rate limiting). + save_results: Whether to save this evaluation result. + Overrides instance setting if provided. + **kwargs: Additional keyword arguments. + + Returns: + An EvaluationResult containing the evaluation feedback and score. + + Raises: + ValueError: If required inputs for the metric are not provided. + + Note: + At least one of query, response, contexts, or reference must be provided. + """ await asyncio.sleep(sleep_time_in_seconds) + save_results = save_results if save_results is not None else self.save_results + try: available_data = self._prepare_available_data(query, response, contexts, reference) eval_input = self._create_eval_input(available_data) - eval_output = await self.judge.async_evaluate(eval_input) + eval_output = await self.judge.async_evaluate(eval_input, save_results=save_results) return EvaluationResult( query=query, response=response, @@ -94,8 +142,27 @@ def _prepare_available_data( response: str | None = None, contexts: Sequence[str] | None = None, reference: str | None = None, - ): - available_data = {} + ) -> dict[str, Any]: + """Prepare available data for evaluation. + + This method collects and formats the provided data into a dictionary for further processing. + + Args: + query: The input query. + response: The model's response. + contexts: Relevant context information. + reference: The reference answer. + + Returns: + A dictionary containing the available data. + + Raises: + ValueError: If no data is provided (all arguments are None). + + Note: + Contexts, if provided, are joined into a single string with double newline separators. + """ + available_data: dict[str, Any] = {} if query is not None: available_data["query"] = query if response is not None: @@ -111,7 +178,24 @@ def _prepare_available_data( ) return available_data - def _create_eval_input(self, available_data: dict[str, Any]): + def _create_eval_input(self, available_data: dict[str, Any]) -> EvalInput: + """Create an EvalInput object from available data. + + This method constructs an EvalInput object based on the metric's required inputs and output. + + Args: + available_data: A dictionary containing the available data for evaluation. + + Returns: + An EvalInput object ready for evaluation. + + Raises: + ValueError: If any required input or output for the metric is not available in the data. + + Note: + The method strictly enforces that all required inputs + and outputs for the metric are present. + """ inputs = [] for required_input in self.metric.required_inputs: if required_input not in available_data: @@ -122,9 +206,121 @@ def _create_eval_input(self, available_data: dict[str, Any]): if self.metric.required_output not in available_data: raise ValueError( - f"Required output '{self.metric.required_output}' \ - is not available in this integration" + f"Required output '{self.metric.required_output}' " + "is not available in this integration" ) output = {self.metric.required_output: available_data[self.metric.required_output]} return EvalInput(inputs=inputs, output=output) + + async def aclose(self) -> None: + """Clean up resources asynchronously. + + This method should be called when the evaluator is no longer needed to ensure + proper cleanup of resources, especially for the judge and model components. + + Note: + It checks for the existence of 'aclose' and 'shutdown' methods before calling them, + making it safe to call even if these methods are not implemented in all cases. + """ + if hasattr(self.judge, "aclose") and callable(self.judge.aclose): + await self.judge.aclose() + if hasattr(self.model, "shutdown") and callable(self.model.shutdown): + await asyncio.to_thread(self.model.shutdown) + + async def aevaluate_batch( + self, + queries: list[str], + responses: list[str], + contexts: list[Sequence[str]] | None = None, + references: list[str] | None = None, + sleep_time_in_seconds: int = 0, + save_results: bool | None = None, + **kwargs: Any, + ) -> list[EvaluationResult]: + """Evaluate a batch of query-response pairs asynchronously. + + This method processes multiple query-response pairs in a single batch, which can be more + efficient than evaluating them individually. + + Args: + queries: List of input queries to evaluate. + responses: List of model responses to evaluate. + contexts: List of relevant context information for each query. + references: List of reference answers for comparison. + sleep_time_in_seconds: Time to sleep before evaluation (for rate limiting). + save_results: Whether to save these evaluation results. + Overrides instance setting if provided. + **kwargs: Additional keyword arguments. + + Returns: + A list of EvaluationResult objects, one for each query-response pair. + + Raises: + ValueError: If the lengths of queries and responses don't match, + or if required inputs are missing. + + Note: + - The method logs progress and any errors encountered during batch processing. + - Results are saved only after all evaluations are complete, if save_results is True. + - Failed evaluations are included in the results list with appropriate + error information. + """ + await asyncio.sleep(sleep_time_in_seconds) + + if len(queries) != len(responses): + raise ValueError("The number of queries and responses must be the same.") + + save_results = save_results if save_results is not None else self.save_results + + results = [] + eval_inputs = [] + eval_outputs = [] + logger.info(f"Processing {len(queries)} queries in aevaluate_batch") + + for i, (query, response) in enumerate(zip(queries, responses, strict=True)): + context = contexts[i] if contexts else None + reference = references[i] if references else None + + try: + available_data = self._prepare_available_data(query, response, context, reference) + eval_input = self._create_eval_input(available_data) + eval_inputs.append(eval_input) + + logger.info(f"Evaluating query {i+1}/{len(queries)}") + eval_output = await self.judge.async_evaluate(eval_input, save_results=False) + eval_outputs.append(eval_output) + + logger.info(f"Finished evaluating query {i+1}/{len(queries)}") + results.append( + EvaluationResult( + query=query, + response=response, + contexts=context, + feedback=eval_output.feedback, + score=eval_output.score, + invalid_result=False, + invalid_reason=None, + ) + ) + except Exception as e: + logger.error(f"Evaluation failed for query {i+1}: {e}") + results.append( + EvaluationResult( + query=query, + response=response, + contexts=context, + feedback=None, + score=None, + invalid_result=True, + invalid_reason=str(e), + ) + ) + + if save_results: + await asyncio.to_thread( + self.judge._save_results, eval_inputs, eval_outputs, append=False + ) + + logger.info(f"Collected {len(results)} results") + return results diff --git a/flow_judge/models/adapters/baseten/todos.md b/flow_judge/models/adapters/baseten/todos.md deleted file mode 100644 index 6374403..0000000 --- a/flow_judge/models/adapters/baseten/todos.md +++ /dev/null @@ -1,43 +0,0 @@ - -# RESULT - -- BatchResult - - collect success and errors - -- Errors - - to include the error information or even the error object - -- Abort mission - - when we accumulate enough errors -> we cancel the execution - - therefore we must track some "total" state - -- We have a smart way of controlling the speed at which we run the evaluations - ie. fill the 'executor' - - To control using the primitive from asyncio, the 'semaphore' - -- To check the webhook proxy address that it's resolvable - -> by requesting the token from the proxy that we will use for listening - -> the make_request function does not check for the webhook proxy to be resolvable - -> therefore the control flow must exist so that it's checked pre-execution - -- Setting the retry parameters - Where should it optimally happen? - - -- We don't have the Baseten and Proxy error possibilities / schema, we need to get them - -- We don't properly resolve the webhook proxy and we send requests to Baseten incurring costs -without being certain they will be able to send the results to the webhook ever - -- The token for the being allowed to listen and open the stream, should be requested -before the generation, and hence we should create the token before the generation on the client, -that way we don't need to pass the request id to the proxy - -- https://docs.baseten.co/api-reference/get-async-request-status - - - - -# TODOS - -1. Figure if we want to have the parser be passed the fail_on_parse_error flag - flow_judge - AsyncFlowJudge - async_batch_evaluate diff --git a/flow_judge/models/vllm.py b/flow_judge/models/vllm.py index 905aef3..fa95d95 100644 --- a/flow_judge/models/vllm.py +++ b/flow_judge/models/vllm.py @@ -293,6 +293,11 @@ def shutdown(self) -> None: if self.exec_async: self.engine.shutdown_background_loop() + async def aclose(self): + """Asynchronous cleanup method.""" + self.shutdown() + await asyncio.sleep(0.1) # Give a moment for background tasks to clean up + class VllmError(Exception): """Custom exception for Vllm-related errors.""" diff --git a/flow_judge/utils/result_writer.py b/flow_judge/utils/result_writer.py index aa58ab2..cee719d 100644 --- a/flow_judge/utils/result_writer.py +++ b/flow_judge/utils/result_writer.py @@ -1,8 +1,12 @@ import json import logging -import os import re from datetime import datetime, timezone +from enum import Enum +from pathlib import Path +from typing import Any + +from pydantic import BaseModel import flow_judge from flow_judge.eval_data_types import EvalInput, EvalOutput @@ -13,55 +17,233 @@ def write_results_to_disk( eval_inputs: list[EvalInput], eval_outputs: list[EvalOutput], - model_metadata: dict, + model_metadata: dict[str, Any], metric_name: str, - output_dir: str, -): + output_dir: str | Path, + append: bool = False, +) -> None: """Write evaluation results, inputs, and metadata to separate JSONL files. - Warning: - The `eval_inputs` and `eval_outputs` lists must have the same length. - If they don't, a ValueError will be raised during the writing process. - """ - fmt_metric_name = re.sub(r"\s", "_", re.sub(r"\(|\)", "", metric_name.lower())) - fmt_model_id = model_metadata["model_id"].replace("/", "__") - timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%S.%f")[:-3] - metadata = { - "library_version": f"{flow_judge.__version__}", - "timestamp": timestamp, - **model_metadata, - } + This function processes evaluation data and writes it to disk in a structured format. + It creates separate files for metadata and results, organizing them in directories + based on the metric name and model ID. - metric_folder = os.path.join(output_dir, fmt_metric_name) - metadata_folder = os.path.join(metric_folder, f"metadata_{fmt_metric_name}_{fmt_model_id}") + Args: + eval_inputs: List of evaluation inputs. + eval_outputs: List of evaluation outputs. + model_metadata: Dictionary containing model metadata. + metric_name: Name of the metric being evaluated. + output_dir: Directory to write output files. + append: If True, append results to existing file. If False, overwrite. Default is False. - # Create all necessary directories - os.makedirs(metadata_folder, exist_ok=True) + Raises: + ValueError: If inputs are invalid, empty, or lists have different lengths. + KeyError: If required keys are missing from model_metadata. + OSError: If there are file system related errors during writing. + Note: + - Ensures eval_inputs and eval_outputs have the same length. + - Creates necessary directories if they don't exist. + - Handles special characters in metric_name and model_id for file naming. + - Overwrites existing files with the same name without warning. + """ + _validate_inputs(eval_inputs, eval_outputs, model_metadata, metric_name) + + fmt_metric_name = _format_name(metric_name) + fmt_model_id = _format_name(model_metadata["model_id"]) + timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%S.%f")[:-3] base_filename = f"{fmt_metric_name}_{fmt_model_id}_{model_metadata['model_type']}_{timestamp}" - metadata_path = os.path.join(metadata_folder, f"metadata_{base_filename}.json") - results_path = os.path.join(metric_folder, f"results_{base_filename}.jsonl") + paths = _prepare_file_paths(output_dir, fmt_metric_name, fmt_model_id, base_filename) + metadata = _prepare_metadata(model_metadata, timestamp) - # Write metadata file try: - with open(metadata_path, "w", encoding="utf-8") as f: - f.write(json.dumps(metadata) + "\n") - except OSError as e: - logger.error(f"Error writing metadata file: {e}") - raise + _write_json_file(paths["metadata"], metadata) - # Write results file - try: - with open(results_path, "w", encoding="utf-8") as f: - for input_data, eval_output in zip(eval_inputs, eval_outputs, strict=True): + mode = "a" if append else "w" + with paths["results"].open(mode, encoding="utf-8") as f: + for eval_input, eval_output in zip(eval_inputs, eval_outputs, strict=True): result = { - "sample": input_data.model_dump(), + "sample": eval_input.model_dump(), "feedback": eval_output.feedback, "score": eval_output.score, } - f.write(json.dumps(result) + "\n") + f.write(json.dumps(result, ensure_ascii=False) + "\n") + + logger.info(f"Results {'appended to' if append else 'saved to'} {paths['results']}") except OSError as e: - logger.error(f"Error writing results file: {e}") + logger.error(f"Error writing files: {e}") raise - logger.info(f"Results saved to {results_path}") + +def _validate_inputs( + eval_inputs: list[EvalInput], + eval_outputs: list[EvalOutput], + model_metadata: dict[str, Any], + metric_name: str, +) -> None: + """Validate input parameters for the write_results_to_disk function. + + Args: + eval_inputs: List of evaluation inputs. + eval_outputs: List of evaluation outputs. + model_metadata: Dictionary containing model metadata. + metric_name: Name of the metric being evaluated. + + Raises: + ValueError: If eval_inputs or eval_outputs are empty, have different lengths, + or if metric_name is empty or only whitespace. + KeyError: If required keys ('model_id', 'model_type') are missing from + model_metadata. + + Note: + This function does not validate the content of eval_inputs or eval_outputs, + only their presence and length. + """ + if not eval_inputs or not eval_outputs: + raise ValueError("eval_inputs and eval_outputs cannot be empty") + if len(eval_inputs) != len(eval_outputs): + raise ValueError("eval_inputs and eval_outputs must have the same length") + if not metric_name or not metric_name.strip(): + raise ValueError("metric_name cannot be empty or only whitespace") + required_keys = {"model_id", "model_type"} + missing_keys = required_keys - set(model_metadata.keys()) + if missing_keys: + raise KeyError(f"model_metadata missing required keys: {missing_keys}") + + +def _format_name(name: str) -> str: + """Format a name for use in file paths by removing special characters. + + Args: + name: The name to format. + + Returns: + A formatted string safe for use in file paths. + + Note: + This function replaces spaces with underscores, removes non-alphanumeric + characters (except underscore and hyphen), and replaces non-ASCII + characters with underscores. + """ + # Replace spaces with underscores + name = name.replace(" ", "_") + # Remove any character that is not alphanumeric, underscore, or hyphen + name = re.sub(r"[^\w\-]", "", name) + # Replace any non-ASCII character with underscore + name = re.sub(r"[^\x00-\x7F]", "_", name) + return name + + +def _prepare_file_paths( + output_dir: str | Path, + fmt_metric_name: str, + fmt_model_id: str, + base_filename: str, +) -> dict[str, Path]: + """Prepare file paths for metadata and results files. + + Args: + output_dir: Base output directory. + fmt_metric_name: Formatted metric name. + fmt_model_id: Formatted model ID. + base_filename: Base filename for output files. + + Returns: + A dictionary containing paths for metadata and results files. + + Note: + This function creates the necessary directories if they don't exist. + It does not check if the resulting file paths already exist. + """ + output_dir = Path(output_dir) + metric_folder = output_dir / fmt_metric_name + metadata_folder = metric_folder / f"metadata_{fmt_metric_name}_{fmt_model_id}" + metadata_folder.mkdir(parents=True, exist_ok=True) + + return { + "metadata": metadata_folder / f"metadata_{base_filename}.json", + "results": metric_folder / f"results_{base_filename}.jsonl", + } + + +def _prepare_metadata(model_metadata: dict[str, Any], timestamp: str) -> dict[str, Any]: + """Prepare metadata dictionary for writing. + + Args: + model_metadata: Dictionary containing model metadata. + timestamp: Timestamp string. + + Returns: + A dictionary containing prepared metadata. + + Note: + - Adds 'library_version' and 'timestamp' to the metadata. + - Converts Pydantic BaseModel instances to dictionaries. + - Converts Enum instances to their values. + - Does not deep copy the input model_metadata. + """ + metadata = { + "library_version": f"{flow_judge.__version__}", + "timestamp": timestamp, + **model_metadata, + } + for key, item in metadata.items(): + if isinstance(item, BaseModel): + metadata[key] = item.model_dump() + elif isinstance(item, Enum): + metadata[key] = item.value + return metadata + + +def _write_json_file(path: Path, data: dict[str, Any]) -> None: + """Write data to a JSON file. + + Args: + path: Path to the output file. + data: Data to write to the file. + + Raises: + OSError: If there's an error writing to the file. + + Note: + - Uses UTF-8 encoding. + - Overwrites the file if it already exists. + - Ensures non-ASCII characters are preserved in the output. + """ + with path.open("w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + +def _write_results_file( + path: Path, eval_inputs: list[EvalInput], eval_outputs: list[EvalOutput], append: bool = False +) -> None: + """Write results to a JSONL file. + + Args: + path: Path to the output file. + eval_inputs: List of evaluation inputs. + eval_outputs: List of evaluation outputs. + append: If True, append to the file. If False, overwrite. Default is False. + + Raises: + OSError: If there's an error writing to the file. + ValueError: If eval_inputs and eval_outputs have different lengths. + + Note: + - Uses UTF-8 encoding. + - Appends to the file if append is True, otherwise overwrites. + - Each line in the file is a JSON object representing one result. + - Ensures non-ASCII characters are preserved in the output. + """ + if len(eval_inputs) != len(eval_outputs): + raise ValueError("eval_inputs and eval_outputs must have the same length") + + mode = "a" if append else "w" + with path.open(mode, encoding="utf-8") as f: + for input_data, eval_output in zip(eval_inputs, eval_outputs, strict=True): + result = { + "sample": input_data.model_dump(), + "feedback": eval_output.feedback, + "score": eval_output.score, + } + f.write(json.dumps(result, ensure_ascii=False) + "\n") diff --git a/pyproject.toml b/pyproject.toml index d375d56..2096d65 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,7 +59,7 @@ hf = [ "bitsandbytes>=0.41.0,<=0.42.0", "accelerate>=0.34.2", ] -vllm = ["vllm>=0.6.2"] +vllm = ["vllm==0.6.2"] llamafile = [ "torch>=2.3.0", "openai>=1.51.0", diff --git a/tests/e2e-local/integrations/test_llama_index_e2e.py b/tests/e2e-local/integrations/test_llama_index_e2e.py index 35dc823..65a15e4 100644 --- a/tests/e2e-local/integrations/test_llama_index_e2e.py +++ b/tests/e2e-local/integrations/test_llama_index_e2e.py @@ -1,13 +1,13 @@ +import asyncio +import json import logging import os -import statistics import tempfile from collections import Counter from pathlib import Path import pytest from llama_index.core import VectorStoreIndex -from llama_index.core.evaluation import BatchEvalRunner from llama_index.core.llama_dataset import download_llama_dataset from flow_judge.integrations.llama_index import LlamaIndexFlowJudge @@ -231,7 +231,9 @@ async def test_correctness_evaluation( quantized=True, download_dir=str(test_cache_dir), ) - flow_judge_evaluator = LlamaIndexFlowJudge(model=model, metric=correctness_metric) + flow_judge_evaluator = LlamaIndexFlowJudge( + model=model, metric=correctness_metric, save_results=True + ) result = await flow_judge_evaluator.aevaluate( query=query, reference=reference, response=response ) @@ -280,89 +282,132 @@ def compare_distributions( @pytest.mark.asyncio -async def test_batch_evaluation(correctness_metric, query, reference, test_cache_dir): - """Performs a batch evaluation of queries using LlamaIndexFlowJudge and analyzes results. +async def test_batch_evaluation(correctness_metric, query, reference, response, test_cache_dir): + """Performs a batch evaluation of queries using LlamaIndexFlowJudge and analyzes results.""" + os.environ["HF_HOME"] = str(test_cache_dir) + model = None + flow_judge_correctness = None + + try: + logging.info("Starting test_batch_evaluation") + + logging.info("Initializing Vllm model") + model = Vllm( + exec_async=True, + gpu_memory_utilization=0.5, + quantized=True, + download_dir=str(test_cache_dir), + ) + logging.info("Vllm model initialized") + + logging.info("Initializing LlamaIndexFlowJudge") + flow_judge_correctness = LlamaIndexFlowJudge( + model=model, + metric=correctness_metric, + output_dir=str(test_cache_dir), + save_results=True, + ) + logging.info("LlamaIndexFlowJudge initialized") - Args: - correctness_metric (CustomMetric): The metric used for evaluation. - query (str): A sample query (not used directly in this function). - reference (str): A sample reference answer (not used directly in this function). - test_cache_dir (Path): Temp dir that is sure to be okay during testing + logging.info("Downloading and preparing dataset") + rag_dataset, documents = await asyncio.to_thread( + download_llama_dataset, "MiniTruthfulQADataset", "./data/mini_truthful_qa" + ) + logging.info("Dataset prepared") - Raises: - AssertionError: If the evaluation results do not meet expected criteria. - """ - os.environ["HF_HOME"] = str(test_cache_dir) - model = Vllm( - exec_async=True, - gpu_memory_utilization=0.5, - quantized=True, - download_dir=str(test_cache_dir), - ) - logging.info("Starting test_batch_evaluation") + logging.info("Creating index and query engine") + index = await asyncio.to_thread(VectorStoreIndex.from_documents, documents=documents) + _ = index.as_query_engine() + logging.info("Index and query engine created") - flow_judge_correctness = LlamaIndexFlowJudge(model=model, metric=correctness_metric) + rag_subset = rag_dataset.examples[:10] + queries = [example.query for example in rag_subset] + references = [example.reference_answer for example in rag_subset] - # Download and prepare the dataset - rag_dataset, documents = download_llama_dataset( - "MiniTruthfulQADataset", "./data/mini_truthful_qa" - ) + # Use the fixtures to create a batch of 3 + queries = [query] * 3 + references = [reference] * 3 + responses = [response] * 3 - # Create the index and query engine - index = VectorStoreIndex.from_documents(documents=documents) # FIXME: depends on OpenAI - query_engine = index.as_query_engine() # FIXME: depends on OpenAI + logging.info(f"Starting evaluation of {len(queries)} queries") + eval_results = await flow_judge_correctness.aevaluate_batch( + queries=queries, responses=responses, references=references, save_results=True + ) + logging.info(f"Finished evaluation, got {len(eval_results)} results") - # Prepare queries and references - rag_subset = rag_dataset.examples[:10] - queries = [example.query for example in rag_subset] - references = [example.reference_answer for example in rag_subset] + # Check results + assert len(eval_results) == len(queries) + for result in eval_results: + assert result.score is not None + assert result.feedback is not None - logging.info(f"Evaluating {len(queries)} queries") + # Verify results were written to disk + expected_metric_folder = test_cache_dir / correctness_metric.name + expected_results_file = next(expected_metric_folder.glob("results_*.jsonl"), None) - evaluators = {"correctness": flow_judge_correctness} + assert ( + expected_results_file is not None + ), f"No results file found in {expected_metric_folder}" - async def batch_eval_runner(evaluators, query_engine, questions, reference=None, num_workers=2): - batch_runner = BatchEvalRunner(evaluators, workers=num_workers, show_progress=True) + # Read and verify the results file + with expected_results_file.open("r") as f: + written_results = [json.loads(line) for line in f] - eval_results = await batch_runner.aevaluate_queries( - query_engine, queries=questions, reference=reference - ) + logging.info("Full content of written_results:") + logging.info(json.dumps(written_results, indent=2)) - return eval_results + assert len(written_results) == len( + queries + ), f"Expected {len(queries)} results, but found {len(written_results)}" - eval_results = await batch_eval_runner( - evaluators=evaluators, query_engine=query_engine, questions=queries, reference=references - ) + for i, result in enumerate(written_results): + logging.info(f"Validating result {i+1}") + assert "sample" in result, f"Result {i+1} is missing 'sample' key" + assert "inputs" in result["sample"], f"Result {i+1} is missing 'inputs' key in 'sample'" - # Check results - assert "correctness" in eval_results - for key in eval_results: - assert len(eval_results[key]) == len(queries) - for result in eval_results[key]: - assert result.score is not None - assert result.feedback is not None + evaluation = result["sample"]["inputs"] + logging.info(f"Evaluation {i+1} structure:") + logging.info(json.dumps(evaluation, indent=2)) + + assert ( + len(evaluation) == 2 + ), f"Expected 2 input components in result {i+1}, but found {len(evaluation)}" + assert ( + "query" in evaluation[0] + ), f"First input component in result {i+1} should be the query" + assert ( + "reference" in evaluation[1] + ), f"Second input component in result {i+1} should be the reference" + + assert ( + evaluation[0]["query"] == queries[i] + ), f"Query in result {i+1} doesn't match the input query" + + assert "feedback" in result, f"Result {i+1} is missing 'feedback'" + assert "score" in result, f"Result {i+1} is missing 'score'" + assert isinstance( + result["score"], (int, float) + ), f"Score in result {i+1} should be a number" - # Calculate score distribution - scores = [result.score for result in eval_results["correctness"]] - actual_distribution = get_scores_distribution(scores) - logging.info(f"Actual score distribution: {actual_distribution}") + logging.info(f"Results successfully written to and verified in {expected_results_file}") - # Calculate average score - average_score = statistics.mean(scores) - logging.info(f"Average score: {average_score:.2f}") + logging.info("Results processing and verification completed") - # Assert that the average score is within an acceptable range - assert ( - 2.5 <= average_score <= 4.5 - ), f"Average score {average_score:.2f} is outside the expected range of 2.5 to 4.5" + finally: + if model: + await model.aclose() - # Check that we have a variety of scores - unique_scores = set(scores) - assert ( - len(unique_scores) >= 3 - ), f"Expected at least 3 different score values, but got {len(unique_scores)}" + if flow_judge_correctness and hasattr(flow_judge_correctness, "aclose"): + await flow_judge_correctness.aclose() + + # Allow time for background tasks to complete + await asyncio.sleep(1) + + # Cancel any remaining tasks + tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] + [task.cancel() for task in tasks] + + # Wait for all tasks to be cancelled + await asyncio.gather(*tasks, return_exceptions=True) - # Log the actual distribution for reference - logging.info(f"Actual distribution: {actual_distribution}") logging.info("test_batch_evaluation completed successfully") - del model diff --git a/tests/unit/models/test_baseten.py b/tests/unit/models/test_baseten.py index 97b9df0..4a349d1 100644 --- a/tests/unit/models/test_baseten.py +++ b/tests/unit/models/test_baseten.py @@ -361,6 +361,14 @@ async def test_baseten_init_valid( # Test asynchronous initialization without webhook secret monkeypatch.delenv("BASETEN_WEBHOOK_SECRET", raising=False) + with ( + patch("flow_judge.models.baseten.get_deployed_model_id", return_value="mock_model_id"), + patch("flow_judge.models.baseten.ensure_model_deployment", return_value=True), + patch( + "flow_judge.models.baseten.ensure_baseten_webhook_secret", + return_value=False, + ), + ): with pytest.raises( BasetenError, match=( diff --git a/tests/unit/utils/test_result_writer.py b/tests/unit/utils/test_result_writer.py new file mode 100644 index 0000000..86b778c --- /dev/null +++ b/tests/unit/utils/test_result_writer.py @@ -0,0 +1,368 @@ +import json +import re +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import pytest + +from flow_judge.eval_data_types import EvalInput, EvalOutput +from flow_judge.utils.result_writer import ( + _format_name, + _prepare_file_paths, + _prepare_metadata, + _validate_inputs, + _write_json_file, + _write_results_file, + write_results_to_disk, +) + + +@pytest.fixture +def sample_eval_inputs() -> list[EvalInput]: + """Fixture providing sample EvalInput objects. + + :return: A list of EvalInput objects for testing. + :rtype: List[EvalInput] + """ + return [ + EvalInput(inputs=[{"prompt": f"prompt_{i}"}], output={"response": f"response_{i}"}) + for i in range(3) + ] + + +@pytest.fixture +def sample_eval_outputs() -> list[EvalOutput]: + """Fixture providing sample EvalOutput objects. + + :return: A list of EvalOutput objects for testing. + :rtype: List[EvalOutput] + """ + return [EvalOutput(feedback=f"feedback_{i}", score=i) for i in range(3)] + + +@pytest.fixture +def sample_model_metadata() -> dict[str, Any]: + """Fixture providing sample model metadata. + + :return: A dictionary containing sample model metadata. + :rtype: Dict[str, Any] + """ + return {"model_id": "test_model", "model_type": "test_type"} + + +@pytest.fixture +def mock_datetime(monkeypatch: pytest.MonkeyPatch) -> None: + """Fixture to mock datetime for consistent timestamps. + + This fixture ensures that all datetime.now() calls within the tested functions + return a consistent timestamp, allowing for predictable file naming and content. + + :param monkeypatch: pytest's monkeypatch fixture for mocking. + :type monkeypatch: pytest.MonkeyPatch + """ + + class MockDatetime: + @staticmethod + def now(tz: timezone | None = None) -> datetime: + return datetime(2023, 1, 1, tzinfo=timezone.utc) + + monkeypatch.setattr("flow_judge.utils.result_writer.datetime", MockDatetime) + + +def test_write_results_to_disk( + tmp_path: Path, + sample_eval_inputs: list[EvalInput], + sample_eval_outputs: list[EvalOutput], + sample_model_metadata: dict[str, Any], + mock_datetime: None, +) -> None: + """Test write_results_to_disk function with valid inputs. + + This test verifies the correct behavior of the write_results_to_disk function + when provided with valid inputs. It checks for proper file creation, naming + conventions, and content integrity. + + Critical aspects tested: + - Correct file structure and naming conventions are followed. + - Both metadata and results files are created with the expected content. + - Timestamp consistency is maintained across file names and content. + + :param tmp_path: Temporary directory path provided by pytest. + :type tmp_path: Path + :param sample_eval_inputs: List of sample evaluation inputs. + :type sample_eval_inputs: List[EvalInput] + :param sample_eval_outputs: List of sample evaluation outputs. + :type sample_eval_outputs: List[EvalOutput] + :param sample_model_metadata: Dictionary of sample model metadata. + :type sample_model_metadata: Dict[str, Any] + :param mock_datetime: Mocked datetime to ensure consistent timestamps. + :type mock_datetime: None + """ + metric_name = "test_metric" + write_results_to_disk( + sample_eval_inputs, sample_eval_outputs, sample_model_metadata, metric_name, tmp_path + ) + + expected_metric_folder = tmp_path / "test_metric" + expected_metadata_folder = expected_metric_folder / "metadata_test_metric_test_model" + expected_metadata_file = ( + expected_metadata_folder + / "metadata_test_metric_test_model_test_type_2023-01-01T00-00-00.000.json" + ) + expected_results_file = ( + expected_metric_folder + / "results_test_metric_test_model_test_type_2023-01-01T00-00-00.000.jsonl" + ) + + assert expected_metadata_file.exists() + assert expected_results_file.exists() + + with expected_metadata_file.open() as f: + metadata = json.load(f) + assert metadata["model_id"] == "test_model" + assert metadata["model_type"] == "test_type" + + with expected_results_file.open() as f: + results = [json.loads(line) for line in f] + assert len(results) == 3 + assert all( + "sample" in result and "feedback" in result and "score" in result for result in results + ) + + +def test_prepare_file_paths(tmp_path: Path) -> None: + """Test _prepare_file_paths function. + + This test ensures that the _prepare_file_paths function correctly generates + the expected file paths and creates necessary directories. + + Critical aspects tested: + - Correct path structure for metadata and results files. + - Directory creation for metadata files. + - Proper naming conventions for generated file paths. + + :param tmp_path: Temporary directory path provided by pytest. + :type tmp_path: Path + """ + paths = _prepare_file_paths(tmp_path, "test_metric", "test_model", "test_base") + assert "metadata" in paths and "results" in paths + assert paths["metadata"].parent.exists() + assert paths["metadata"].name.startswith("metadata_test_base") + assert paths["results"].name.startswith("results_test_base") + + +def test_prepare_metadata(sample_model_metadata: dict[str, Any]) -> None: + """Test _prepare_metadata function. + + This test verifies that the _prepare_metadata function correctly prepares + the metadata dictionary with all required fields. + + Critical aspects tested: + - Inclusion of library_version in the metadata. + - Correct timestamp assignment. + - Preservation of original model metadata. + + :param sample_model_metadata: Dictionary of sample model metadata. + :type sample_model_metadata: Dict[str, Any] + """ + timestamp = "2023-01-01T00:00:00.000Z" + metadata = _prepare_metadata(sample_model_metadata, timestamp) + assert "library_version" in metadata + assert metadata["timestamp"] == timestamp + assert metadata["model_id"] == "test_model" + assert metadata["model_type"] == "test_type" + + +def test_write_json_file(tmp_path: Path) -> None: + """Test _write_json_file function. + + This test ensures that the _write_json_file function correctly writes + JSON data to a file, including proper handling of unicode characters. + + Critical aspects tested: + - Successful file creation. + - Correct JSON content writing, including unicode characters. + - Proper file encoding (UTF-8). + + :param tmp_path: Temporary directory path provided by pytest. + :type tmp_path: Path + """ + test_data = {"key": "value", "unicode": "测试"} + test_file = tmp_path / "test.json" + _write_json_file(test_file, test_data) + + assert test_file.exists() + with test_file.open(encoding="utf-8") as f: + content = json.load(f) + assert content == test_data + + +@pytest.mark.parametrize( + "invalid_input", + [ + ([], [EvalOutput(feedback="test", score=0)]), + ([EvalInput(inputs=[{"prompt": "test"}], output={"response": "test"})], []), + ( + [EvalInput(inputs=[{"prompt": "test"}], output={"response": "test"})], + [EvalOutput(feedback="test", score=0)] * 2, + ), + ], +) +def test_write_results_to_disk_invalid_inputs( + tmp_path: Path, + invalid_input: tuple[list[EvalInput], list[EvalOutput]], + sample_model_metadata: dict[str, Any], +) -> None: + """Test write_results_to_disk function with invalid inputs. + + This test verifies that the write_results_to_disk function correctly handles + and raises errors for various invalid input scenarios. + + Critical aspects tested: + - ValueError is raised for empty input or output lists. + - ValueError is raised when input and output lists have different lengths. + - The function validates inputs before attempting any file operations. + + :param tmp_path: Temporary directory path provided by pytest. + :type tmp_path: Path + :param invalid_input: Tuple of invalid input and output lists. + :type invalid_input: Tuple[List[EvalInput], List[EvalOutput]] + :param sample_model_metadata: Dictionary of sample model metadata. + :type sample_model_metadata: Dict[str, Any] + """ + with pytest.raises(ValueError): + write_results_to_disk( + invalid_input[0], invalid_input[1], sample_model_metadata, "test_metric", tmp_path + ) + + +def test_validate_inputs( + sample_eval_inputs: list[EvalInput], + sample_eval_outputs: list[EvalOutput], + sample_model_metadata: dict[str, Any], +) -> None: + """Test _validate_inputs function with valid and invalid inputs. + + This test comprehensively checks the input validation logic, ensuring that + all possible invalid input scenarios are correctly identified and result in + appropriate error messages. + + Critical aspects tested: + - Validation passes for valid inputs. + - ValueError is raised for empty input or output lists. + - ValueError is raised when input and output lists have different lengths. + - ValueError is raised for empty or whitespace-only metric names. + - KeyError is raised when required keys are missing from model metadata. + + :param sample_eval_inputs: List of sample evaluation inputs. + :type sample_eval_inputs: List[EvalInput] + :param sample_eval_outputs: List of sample evaluation outputs. + :type sample_eval_outputs: List[EvalOutput] + :param sample_model_metadata: Dictionary of sample model metadata. + :type sample_model_metadata: Dict[str, Any] + """ + _validate_inputs(sample_eval_inputs, sample_eval_outputs, sample_model_metadata, "test_metric") + + with pytest.raises(ValueError, match="eval_inputs and eval_outputs cannot be empty"): + _validate_inputs([], sample_eval_outputs, sample_model_metadata, "test_metric") + + with pytest.raises(ValueError, match="eval_inputs and eval_outputs must have the same length"): + _validate_inputs( + sample_eval_inputs, sample_eval_outputs[:-1], sample_model_metadata, "test_metric" + ) + + with pytest.raises(ValueError, match="metric_name cannot be empty or only whitespace"): + _validate_inputs(sample_eval_inputs, sample_eval_outputs, sample_model_metadata, "") + + with pytest.raises(KeyError, match="model_metadata missing required keys"): + _validate_inputs(sample_eval_inputs, sample_eval_outputs, {}, "test_metric") + + +@pytest.mark.parametrize( + "name,expected", + [ + ("test name", "test_name"), + ("test@name", "testname"), + ("test-name", "test-name"), + ("test_name", "test_name"), + ("test123", "test123"), + ("テスト", "___"), + ("", ""), + ], +) +def test_format_name(name: str, expected: str) -> None: + """Test _format_name function with various input strings. + + This test verifies the name formatting logic across a wide range of input + scenarios, ensuring consistent and safe output for file naming purposes. + + Critical aspects tested: + - Spaces are replaced with underscores. + - Special characters are removed or replaced appropriately. + - Non-ASCII characters are replaced with underscores. + - Existing valid characters (alphanumeric, underscore, hyphen) are preserved. + - Empty strings are handled correctly. + + :param name: Input string to be formatted. + :type name: str + :param expected: Expected output after formatting. + :type expected: str + """ + assert _format_name(name) == expected + + +def test_format_name_implementation(): + """Test the actual implementation of _format_name function. + + This test ensures that the _format_name function correctly handles all + expected input scenarios, including non-ASCII characters. + """ + + def format_name(name: str) -> str: + # Replace spaces with underscores + name = name.replace(" ", "_") + # Remove any character that is not alphanumeric, underscore, or hyphen + name = re.sub(r"[^\w-]", "", name) + # Replace any non-ASCII character with underscore + name = re.sub(r"[^\x00-\x7F]", "_", name) + return name + + assert format_name("test name") == "test_name" + assert format_name("test@name") == "testname" + assert format_name("test-name") == "test-name" + assert format_name("test_name") == "test_name" + assert format_name("test123") == "test123" + assert format_name("テスト") == "___" + + # Additional test cases + assert format_name("hello世界") == "hello__" + assert format_name("123_abc-XYZ") == "123_abc-XYZ" + assert format_name("!@#$%^&*()") == "" + + +def test_write_results_file_unicode(tmp_path: Path) -> None: + """Test _write_results_file function with unicode characters. + + This test ensures that the function correctly handles and preserves unicode + characters when writing to files, which is crucial for internationalization + and proper data representation. + + Critical aspects tested: + - Unicode characters in both input content and feedback are preserved. + - The written file can be read back with unicode characters intact. + - The JSON encoding and decoding process handles unicode correctly. + + :param tmp_path: Temporary directory path provided by pytest. + :type tmp_path: Path + """ + unicode_inputs = [EvalInput(inputs=[{"prompt": "测试"}], output={"response": "テスト"})] + unicode_outputs = [EvalOutput(feedback="フィードバック", score=1)] + test_file = tmp_path / "test.jsonl" + _write_results_file(test_file, unicode_inputs, unicode_outputs) + + with test_file.open(encoding="utf-8") as f: + content = json.loads(f.read()) + assert content["sample"]["inputs"][0]["prompt"] == "测试" + assert content["sample"]["output"]["response"] == "テスト" + assert content["feedback"] == "フィードバック"