From 30352c01015fc9fbcd098530b89e40ff01e67031 Mon Sep 17 00:00:00 2001 From: Mandolaro Date: Fri, 15 Aug 2025 14:47:59 -0700 Subject: [PATCH 1/3] add try except --- .../common/trainer/trainable_model.py | 156 ++++++---- src/judgeval/common/trainer/trainer.py | 282 +++++++++++------- 2 files changed, 261 insertions(+), 177 deletions(-) diff --git a/src/judgeval/common/trainer/trainable_model.py b/src/judgeval/common/trainer/trainable_model.py index acc2b56d..a220b3ed 100644 --- a/src/judgeval/common/trainer/trainable_model.py +++ b/src/judgeval/common/trainer/trainable_model.py @@ -2,6 +2,7 @@ from .config import TrainerConfig, ModelConfig from typing import Optional, Dict, Any, Callable from .console import _model_spinner_progress, _print_model_progress +from judgeval.common.exceptions import JudgmentAPIError class TrainableModel: @@ -20,13 +21,18 @@ def __init__(self, config: TrainerConfig): Args: config: TrainerConfig instance with model configuration """ - self.config = config - self.current_step = 0 - self._current_model = None - self._tracer_wrapper_func = None + try: + self.config = config + self.current_step = 0 + self._current_model = None + self._tracer_wrapper_func = None - self._base_model = self._create_base_model() - self._current_model = self._base_model + self._base_model = self._create_base_model() + self._current_model = self._base_model + except Exception as e: + raise JudgmentAPIError( + f"Failed to initialize TrainableModel: {str(e)}" + ) from e @classmethod def from_model_config(cls, model_config: ModelConfig) -> "TrainableModel": @@ -58,38 +64,48 @@ def from_model_config(cls, model_config: ModelConfig) -> "TrainableModel": def _create_base_model(self): """Create and configure the base model.""" - with _model_spinner_progress( - "Creating and deploying base model..." - ) as update_progress: - update_progress("Creating base model instance...") - base_model = LLM( - model=self.config.base_model_name, - deployment_type="on-demand", - id=self.config.deployment_id, - enable_addons=self.config.enable_addons, - ) - update_progress("Applying deployment configuration...") - base_model.apply() - _print_model_progress("Base model deployment ready") - return base_model + try: + with _model_spinner_progress( + "Creating and deploying base model..." + ) as update_progress: + update_progress("Creating base model instance...") + base_model = LLM( + model=self.config.base_model_name, + deployment_type="on-demand", + id=self.config.deployment_id, + enable_addons=self.config.enable_addons, + ) + update_progress("Applying deployment configuration...") + base_model.apply() + _print_model_progress("Base model deployment ready") + return base_model + except Exception as e: + raise JudgmentAPIError( + f"Failed to create and deploy base model '{self.config.base_model_name}': {str(e)}" + ) from e def _load_trained_model(self, model_name: str): """Load a trained model by name.""" - with _model_spinner_progress( - f"Loading and deploying trained model: {model_name}" - ) as update_progress: - update_progress("Creating trained model instance...") - self._current_model = LLM( - model=model_name, - deployment_type="on-demand-lora", - base_id=self.config.deployment_id, - ) - update_progress("Applying deployment configuration...") - self._current_model.apply() - _print_model_progress("Trained model deployment ready") + try: + with _model_spinner_progress( + f"Loading and deploying trained model: {model_name}" + ) as update_progress: + update_progress("Creating trained model instance...") + self._current_model = LLM( + model=model_name, + deployment_type="on-demand-lora", + base_id=self.config.deployment_id, + ) + update_progress("Applying deployment configuration...") + self._current_model.apply() + _print_model_progress("Trained model deployment ready") - if self._tracer_wrapper_func: - self._tracer_wrapper_func(self._current_model) + if self._tracer_wrapper_func: + self._tracer_wrapper_func(self._current_model) + except Exception as e: + raise JudgmentAPIError( + f"Failed to load and deploy trained model '{model_name}': {str(e)}" + ) from e def get_current_model(self): return self._current_model @@ -111,29 +127,32 @@ def advance_to_next_step(self, step: int): Args: step: The current training step number """ - self.current_step = step - - if step == 0: - self._current_model = self._base_model - else: - model_name = ( - f"accounts/{self.config.user_id}/models/{self.config.model_id}-v{step}" - ) - with _model_spinner_progress( - f"Creating and deploying model snapshot: {model_name}" - ) as update_progress: - update_progress("Creating model snapshot instance...") - self._current_model = LLM( - model=model_name, - deployment_type="on-demand-lora", - base_id=self.config.deployment_id, - ) - update_progress("Applying deployment configuration...") - self._current_model.apply() - _print_model_progress("Model snapshot deployment ready") - - if self._tracer_wrapper_func: - self._tracer_wrapper_func(self._current_model) + try: + self.current_step = step + + if step == 0: + self._current_model = self._base_model + else: + model_name = f"accounts/{self.config.user_id}/models/{self.config.model_id}-v{step}" + with _model_spinner_progress( + f"Creating and deploying model snapshot: {model_name}" + ) as update_progress: + update_progress("Creating model snapshot instance...") + self._current_model = LLM( + model=model_name, + deployment_type="on-demand-lora", + base_id=self.config.deployment_id, + ) + update_progress("Applying deployment configuration...") + self._current_model.apply() + _print_model_progress("Model snapshot deployment ready") + + if self._tracer_wrapper_func: + self._tracer_wrapper_func(self._current_model) + except Exception as e: + raise JudgmentAPIError( + f"Failed to advance to training step {step}: {str(e)}" + ) from e def perform_reinforcement_step(self, dataset, step: int): """ @@ -146,15 +165,20 @@ def perform_reinforcement_step(self, dataset, step: int): Returns: Training job object """ - model_name = f"{self.config.model_id}-v{step + 1}" - return self._current_model.reinforcement_step( - dataset=dataset, - output_model=model_name, - epochs=self.config.epochs, - learning_rate=self.config.learning_rate, - accelerator_count=self.config.accelerator_count, - accelerator_type=self.config.accelerator_type, - ) + try: + model_name = f"{self.config.model_id}-v{step + 1}" + return self._current_model.reinforcement_step( + dataset=dataset, + output_model=model_name, + epochs=self.config.epochs, + learning_rate=self.config.learning_rate, + accelerator_count=self.config.accelerator_count, + accelerator_type=self.config.accelerator_type, + ) + except Exception as e: + raise JudgmentAPIError( + f"Failed to start reinforcement learning step {step + 1}: {str(e)}" + ) from e def get_model_config( self, training_params: Optional[Dict[str, Any]] = None diff --git a/src/judgeval/common/trainer/trainer.py b/src/judgeval/common/trainer/trainer.py index 95d51284..fe0b7fb6 100644 --- a/src/judgeval/common/trainer/trainer.py +++ b/src/judgeval/common/trainer/trainer.py @@ -9,6 +9,7 @@ from judgeval.scorers import BaseScorer, APIScorerConfig from judgeval.data import Example from .console import _spinner_progress, _print_progress, _print_progress_update +from judgeval.common.exceptions import JudgmentAPIError class JudgmentTrainer: @@ -35,17 +36,22 @@ def __init__( trainable_model: Optional trainable model instance project_name: Project name for organizing training runs and evaluations """ - self.config = config - self.tracer = tracer - self.tracer.show_trace_urls = False - self.project_name = project_name or "judgment_training" - - if trainable_model is None: - self.trainable_model = TrainableModel(self.config) - else: - self.trainable_model = trainable_model - - self.judgment_client = JudgmentClient() + try: + self.config = config + self.tracer = tracer + self.tracer.show_trace_urls = False + self.project_name = project_name or "judgment_training" + + if trainable_model is None: + self.trainable_model = TrainableModel(self.config) + else: + self.trainable_model = trainable_model + + self.judgment_client = JudgmentClient() + except Exception as e: + raise JudgmentAPIError( + f"Failed to initialize JudgmentTrainer: {str(e)}" + ) from e async def generate_rollouts_and_rewards( self, @@ -83,45 +89,52 @@ async def generate_rollouts_and_rewards( @self.tracer.observe(span_type="function") async def generate_single_response(prompt_id, generation_id): - async with semaphore: - prompt_input = prompts[prompt_id] - response_data = await agent_function(**prompt_input) - messages = response_data.get("messages", []) - - try: - traced_messages = self.tracer.get_current_message_history() - if traced_messages: - messages = traced_messages - except Exception as e: - print(f"Warning: Failed to get message history from trace: {e}") - pass - - example = Example( - input=prompt_input, messages=messages, actual_output=response_data - ) - - scoring_results = self.judgment_client.run_evaluation( - examples=[example], - scorers=scorers, - project_name=self.project_name, - eval_run_name=f"training_step_{self.trainable_model.current_step}_prompt_{prompt_id}_gen_{generation_id}", - show_url=False, - ) - - if scoring_results and scoring_results[0].scorers_data: - reward = sum( - scorer_data.score - for scorer_data in scoring_results[0].scorers_data - ) / len(scoring_results[0].scorers_data) - else: - reward = 0.0 - - return { - "prompt_id": prompt_id, - "generation_id": generation_id, - "messages": messages, - "evals": {"score": reward}, - } + try: + async with semaphore: + prompt_input = prompts[prompt_id] + response_data = await agent_function(**prompt_input) + messages = response_data.get("messages", []) + + try: + traced_messages = self.tracer.get_current_message_history() + if traced_messages: + messages = traced_messages + except Exception as e: + print(f"Warning: Failed to get message history from trace: {e}") + pass + + example = Example( + input=prompt_input, + messages=messages, + actual_output=response_data, + ) + + scoring_results = self.judgment_client.run_evaluation( + examples=[example], + scorers=scorers, + project_name=self.project_name, + eval_run_name=f"training_step_{self.trainable_model.current_step}_prompt_{prompt_id}_gen_{generation_id}", + show_url=False, + ) + + if scoring_results and scoring_results[0].scorers_data: + reward = sum( + scorer_data.score + for scorer_data in scoring_results[0].scorers_data + ) / len(scoring_results[0].scorers_data) + else: + reward = 0.0 + + return { + "prompt_id": prompt_id, + "generation_id": generation_id, + "messages": messages, + "evals": {"score": reward}, + } + except Exception as e: + raise JudgmentAPIError( + f"Failed to generate rollout for prompt {prompt_id}, generation {generation_id}: {str(e)}" + ) from e coros = [] for prompt_id in range(num_prompts_per_step): @@ -140,16 +153,21 @@ async def generate_single_response(prompt_id, generation_id): _print_progress(f"Generated {len(results)} rollouts successfully") - dataset_rows = [] - for prompt_id in range(num_prompts_per_step): - prompt_generations = [r for r in results if r["prompt_id"] == prompt_id] - sample_generations = [ - {"messages": gen["messages"], "evals": gen["evals"]} - for gen in prompt_generations - ] - dataset_rows.append({"samples": sample_generations}) - - return dataset_rows + try: + dataset_rows = [] + for prompt_id in range(num_prompts_per_step): + prompt_generations = [r for r in results if r["prompt_id"] == prompt_id] + sample_generations = [ + {"messages": gen["messages"], "evals": gen["evals"]} + for gen in prompt_generations + ] + dataset_rows.append({"samples": sample_generations}) + + return dataset_rows + except Exception as e: + raise JudgmentAPIError( + f"Failed to process rollout results into dataset format: {str(e)}" + ) from e async def run_reinforcement_learning( self, @@ -197,61 +215,95 @@ async def run_reinforcement_learning( f"Starting training step {step_num}", step_num, self.config.num_steps ) - self.trainable_model.advance_to_next_step(step) - - dataset_rows = await self.generate_rollouts_and_rewards( - agent_function, scorers, prompts - ) + try: + self.trainable_model.advance_to_next_step(step) - with _spinner_progress( - "Preparing training dataset", step_num, self.config.num_steps - ): - dataset = Dataset.from_list(dataset_rows) - dataset.sync() + dataset_rows = await self.generate_rollouts_and_rewards( + agent_function, scorers, prompts + ) - _print_progress( - "Starting reinforcement training", step_num, self.config.num_steps - ) - job = self.trainable_model.perform_reinforcement_step(dataset, step) - - last_state = None - with _spinner_progress( - "Training job in progress", step_num, self.config.num_steps - ): - while not job.is_completed: - job.raise_if_bad_state() - current_state = job.state - - if current_state != last_state: - if current_state in ["uploading", "validating"]: - _print_progress_update( - f"Training job: {current_state} data" - ) - elif current_state == "training": - _print_progress_update( - "Training job: model training in progress" - ) - else: - _print_progress_update(f"Training job: {current_state}") - last_state = current_state - - time.sleep(10) - job = job.get() - if job is None: - raise Exception("Job was deleted while waiting for completion") + with _spinner_progress( + "Preparing training dataset", step_num, self.config.num_steps + ): + try: + dataset = Dataset.from_list(dataset_rows) + dataset.sync() + except Exception as e: + raise JudgmentAPIError( + f"Failed to create training dataset for step {step_num}: {str(e)}" + ) from e + + _print_progress( + "Starting reinforcement training", step_num, self.config.num_steps + ) + job = self.trainable_model.perform_reinforcement_step(dataset, step) + + last_state = None + with _spinner_progress( + "Training job in progress", step_num, self.config.num_steps + ): + try: + while not job.is_completed: + job.raise_if_bad_state() + current_state = job.state + + if current_state != last_state: + if current_state in ["uploading", "validating"]: + _print_progress_update( + f"Training job: {current_state} data" + ) + elif current_state == "training": + _print_progress_update( + "Training job: model training in progress" + ) + else: + _print_progress_update( + f"Training job: {current_state}" + ) + last_state = current_state + + time.sleep(10) + job = job.get() + if job is None: + raise JudgmentAPIError( + "Training job was deleted while waiting for completion" + ) + except Exception as e: + if "Training job was deleted" in str(e): + raise e # Re-raise JudgmentAPIError as-is + raise JudgmentAPIError( + f"Training job failed during step {step_num}: {str(e)}" + ) from e + + _print_progress( + f"Training completed! New model: {job.output_model}", + step_num, + self.config.num_steps, + ) - _print_progress( - f"Training completed! New model: {job.output_model}", - step_num, - self.config.num_steps, - ) + try: + dataset.delete() + except Exception as e: + # Log warning but don't fail the training + print(f"Warning: Failed to delete training dataset: {e}") - dataset.delete() + except JudgmentAPIError: + # Re-raise JudgmentAPIError as-is + raise + except Exception as e: + raise JudgmentAPIError( + f"Training step {step_num} failed: {str(e)}" + ) from e _print_progress("All training steps completed!") - with _spinner_progress("Deploying final trained model"): - self.trainable_model.advance_to_next_step(self.config.num_steps) + try: + with _spinner_progress("Deploying final trained model"): + self.trainable_model.advance_to_next_step(self.config.num_steps) + except Exception as e: + raise JudgmentAPIError( + f"Failed to deploy final trained model: {str(e)}" + ) from e return self.trainable_model.get_model_config(training_params) @@ -277,7 +329,15 @@ async def train( Returns: ModelConfig: Configuration of the trained model for future loading """ - if rft_provider is not None: - self.config.rft_provider = rft_provider + try: + if rft_provider is not None: + self.config.rft_provider = rft_provider - return await self.run_reinforcement_learning(agent_function, scorers, prompts) + return await self.run_reinforcement_learning( + agent_function, scorers, prompts + ) + except JudgmentAPIError: + # Re-raise JudgmentAPIError as-is + raise + except Exception as e: + raise JudgmentAPIError(f"Training process failed: {str(e)}") from e From e9ab2d775b6b01d4584823701cc017565f65a738 Mon Sep 17 00:00:00 2001 From: Mandolaro Date: Fri, 15 Aug 2025 14:58:42 -0700 Subject: [PATCH 2/3] cleanup try except --- src/judgeval/common/trainer/trainer.py | 240 ++++++++++--------------- 1 file changed, 99 insertions(+), 141 deletions(-) diff --git a/src/judgeval/common/trainer/trainer.py b/src/judgeval/common/trainer/trainer.py index fe0b7fb6..2ca0b092 100644 --- a/src/judgeval/common/trainer/trainer.py +++ b/src/judgeval/common/trainer/trainer.py @@ -89,52 +89,47 @@ async def generate_rollouts_and_rewards( @self.tracer.observe(span_type="function") async def generate_single_response(prompt_id, generation_id): - try: - async with semaphore: - prompt_input = prompts[prompt_id] - response_data = await agent_function(**prompt_input) - messages = response_data.get("messages", []) - - try: - traced_messages = self.tracer.get_current_message_history() - if traced_messages: - messages = traced_messages - except Exception as e: - print(f"Warning: Failed to get message history from trace: {e}") - pass - - example = Example( - input=prompt_input, - messages=messages, - actual_output=response_data, - ) - - scoring_results = self.judgment_client.run_evaluation( - examples=[example], - scorers=scorers, - project_name=self.project_name, - eval_run_name=f"training_step_{self.trainable_model.current_step}_prompt_{prompt_id}_gen_{generation_id}", - show_url=False, - ) - - if scoring_results and scoring_results[0].scorers_data: - reward = sum( - scorer_data.score - for scorer_data in scoring_results[0].scorers_data - ) / len(scoring_results[0].scorers_data) - else: - reward = 0.0 - - return { - "prompt_id": prompt_id, - "generation_id": generation_id, - "messages": messages, - "evals": {"score": reward}, - } - except Exception as e: - raise JudgmentAPIError( - f"Failed to generate rollout for prompt {prompt_id}, generation {generation_id}: {str(e)}" - ) from e + async with semaphore: + prompt_input = prompts[prompt_id] + response_data = await agent_function(**prompt_input) + messages = response_data.get("messages", []) + + try: + traced_messages = self.tracer.get_current_message_history() + if traced_messages: + messages = traced_messages + except Exception as e: + print(f"Warning: Failed to get message history from trace: {e}") + pass + + example = Example( + input=prompt_input, + messages=messages, + actual_output=response_data, + ) + + scoring_results = self.judgment_client.run_evaluation( + examples=[example], + scorers=scorers, + project_name=self.project_name, + eval_run_name=f"training_step_{self.trainable_model.current_step}_prompt_{prompt_id}_gen_{generation_id}", + show_url=False, + ) + + if scoring_results and scoring_results[0].scorers_data: + reward = sum( + scorer_data.score + for scorer_data in scoring_results[0].scorers_data + ) / len(scoring_results[0].scorers_data) + else: + reward = 0.0 + + return { + "prompt_id": prompt_id, + "generation_id": generation_id, + "messages": messages, + "evals": {"score": reward}, + } coros = [] for prompt_id in range(num_prompts_per_step): @@ -153,21 +148,16 @@ async def generate_single_response(prompt_id, generation_id): _print_progress(f"Generated {len(results)} rollouts successfully") - try: - dataset_rows = [] - for prompt_id in range(num_prompts_per_step): - prompt_generations = [r for r in results if r["prompt_id"] == prompt_id] - sample_generations = [ - {"messages": gen["messages"], "evals": gen["evals"]} - for gen in prompt_generations - ] - dataset_rows.append({"samples": sample_generations}) - - return dataset_rows - except Exception as e: - raise JudgmentAPIError( - f"Failed to process rollout results into dataset format: {str(e)}" - ) from e + dataset_rows = [] + for prompt_id in range(num_prompts_per_step): + prompt_generations = [r for r in results if r["prompt_id"] == prompt_id] + sample_generations = [ + {"messages": gen["messages"], "evals": gen["evals"]} + for gen in prompt_generations + ] + dataset_rows.append({"samples": sample_generations}) + + return dataset_rows async def run_reinforcement_learning( self, @@ -215,95 +205,63 @@ async def run_reinforcement_learning( f"Starting training step {step_num}", step_num, self.config.num_steps ) - try: - self.trainable_model.advance_to_next_step(step) + self.trainable_model.advance_to_next_step(step) - dataset_rows = await self.generate_rollouts_and_rewards( - agent_function, scorers, prompts - ) + dataset_rows = await self.generate_rollouts_and_rewards( + agent_function, scorers, prompts + ) - with _spinner_progress( - "Preparing training dataset", step_num, self.config.num_steps - ): - try: - dataset = Dataset.from_list(dataset_rows) - dataset.sync() - except Exception as e: - raise JudgmentAPIError( - f"Failed to create training dataset for step {step_num}: {str(e)}" - ) from e + with _spinner_progress( + "Preparing training dataset", step_num, self.config.num_steps + ): + dataset = Dataset.from_list(dataset_rows) + dataset.sync() - _print_progress( - "Starting reinforcement training", step_num, self.config.num_steps - ) - job = self.trainable_model.perform_reinforcement_step(dataset, step) - - last_state = None - with _spinner_progress( - "Training job in progress", step_num, self.config.num_steps - ): - try: - while not job.is_completed: - job.raise_if_bad_state() - current_state = job.state - - if current_state != last_state: - if current_state in ["uploading", "validating"]: - _print_progress_update( - f"Training job: {current_state} data" - ) - elif current_state == "training": - _print_progress_update( - "Training job: model training in progress" - ) - else: - _print_progress_update( - f"Training job: {current_state}" - ) - last_state = current_state - - time.sleep(10) - job = job.get() - if job is None: - raise JudgmentAPIError( - "Training job was deleted while waiting for completion" - ) - except Exception as e: - if "Training job was deleted" in str(e): - raise e # Re-raise JudgmentAPIError as-is + _print_progress( + "Starting reinforcement training", step_num, self.config.num_steps + ) + job = self.trainable_model.perform_reinforcement_step(dataset, step) + + last_state = None + with _spinner_progress( + "Training job in progress", step_num, self.config.num_steps + ): + while not job.is_completed: + job.raise_if_bad_state() + current_state = job.state + + if current_state != last_state: + if current_state in ["uploading", "validating"]: + _print_progress_update( + f"Training job: {current_state} data" + ) + elif current_state == "training": + _print_progress_update( + "Training job: model training in progress" + ) + else: + _print_progress_update(f"Training job: {current_state}") + last_state = current_state + + time.sleep(10) + job = job.get() + if job is None: raise JudgmentAPIError( - f"Training job failed during step {step_num}: {str(e)}" - ) from e - - _print_progress( - f"Training completed! New model: {job.output_model}", - step_num, - self.config.num_steps, - ) + "Training job was deleted while waiting for completion" + ) - try: - dataset.delete() - except Exception as e: - # Log warning but don't fail the training - print(f"Warning: Failed to delete training dataset: {e}") + _print_progress( + f"Training completed! New model: {job.output_model}", + step_num, + self.config.num_steps, + ) - except JudgmentAPIError: - # Re-raise JudgmentAPIError as-is - raise - except Exception as e: - raise JudgmentAPIError( - f"Training step {step_num} failed: {str(e)}" - ) from e + dataset.delete() _print_progress("All training steps completed!") - try: - with _spinner_progress("Deploying final trained model"): - self.trainable_model.advance_to_next_step(self.config.num_steps) - except Exception as e: - raise JudgmentAPIError( - f"Failed to deploy final trained model: {str(e)}" - ) from e + with _spinner_progress("Deploying final trained model"): + self.trainable_model.advance_to_next_step(self.config.num_steps) return self.trainable_model.get_model_config(training_params) From 727a450abc86da656bff75d7f1e883d698e22ddb Mon Sep 17 00:00:00 2001 From: Mandolaro Date: Fri, 15 Aug 2025 17:50:13 -0700 Subject: [PATCH 3/3] fix config --- src/judgeval/common/trainer/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/judgeval/common/trainer/config.py b/src/judgeval/common/trainer/config.py index 4bd5a2d5..aeb1d78c 100644 --- a/src/judgeval/common/trainer/config.py +++ b/src/judgeval/common/trainer/config.py @@ -14,7 +14,7 @@ class TrainerConfig: rft_provider: str = "fireworks" num_steps: int = 5 num_generations_per_prompt: int = ( - 5 # Number of rollouts/generations per input prompt + 4 # Number of rollouts/generations per input prompt ) num_prompts_per_step: int = 4 # Number of input prompts to sample per training step concurrency: int = 100