diff --git a/e2etests/judgment_client_test.py b/e2etests/judgment_client_test.py index d9740941..31d7dd79 100644 --- a/e2etests/judgment_client_test.py +++ b/e2etests/judgment_client_test.py @@ -29,7 +29,6 @@ def test_dataset(client: JudgmentClient): # PULL dataset = client.pull_dataset(alias="test_dataset_5") print(dataset) - def test_run_eval(client: JudgmentClient): diff --git a/e2etests/test_tracer.py b/e2etests/test_tracer.py index 15ed35ee..95cf8453 100644 --- a/e2etests/test_tracer.py +++ b/e2etests/test_tracer.py @@ -1,9 +1,16 @@ +# Standard library imports +import os +import time +import asyncio + +# Third-party imports from openai import OpenAI from together import Together from anthropic import Anthropic -from judgeval.common.tracer import Tracer, wrap -import time +# Local imports +from judgeval.common.tracer import Tracer, wrap +from judgeval.constants import APIScorer # Initialize the tracer and clients judgment = Tracer(api_key=os.getenv("JUDGMENT_API_KEY")) @@ -11,46 +18,115 @@ anthropic_client = wrap(Anthropic()) @judgment.observe -def make_upper(input): - return input.upper() - -@judgment.observe -def make_lower(input): - return input.lower() +async def make_upper(input: str) -> str: + """Convert input to uppercase and evaluate using judgment API. + + Args: + input: The input string to convert + Returns: + The uppercase version of the input string + """ + output = input.upper() + await judgment.get_current_trace().async_evaluate( + input="What if these shoes don't fit?", + actual_output="We offer a 30-day full refund at no extra cost.", + retrieval_context=["All customers are eligible for a 30 day full refund at no extra cost."], + expected_output="We offer a 30-day full refund at no extra cost.", + expected_tools=["refund"], + score_type=APIScorer.FAITHFULNESS, + threshold=0.5, + model="gpt-4o-mini", + log_results=True + ) + return output @judgment.observe -def make_poem(input): +async def make_lower(input): + output = input.lower() - # Using Anthropic API - anthropic_response = anthropic_client.messages.create( - model="claude-3-sonnet-20240229", - messages=[{ - "role": "user", - "content": input - }], - max_tokens=30 + await judgment.get_current_trace().async_evaluate( + input="How do I reset my password?", + actual_output="You can reset your password by clicking on 'Forgot Password' at the login screen.", + expected_output="You can reset your password by clicking on 'Forgot Password' at the login screen.", + context=["User Account"], + retrieval_context=["Password reset instructions"], + tools_called=["authentication"], + expected_tools=["authentication"], + additional_metadata={"difficulty": "medium"}, + score_type=APIScorer.ANSWER_RELEVANCY, + threshold=0.5, + model="gpt-4o-mini", + log_results=True ) - anthropic_result = anthropic_response.content[0].text - - # Using OpenAI API - openai_response = openai_client.chat.completions.create( + return output + +@judgment.observe +def llm_call(input): + return "We have a 30 day full refund policy on shoes." + +@judgment.observe +async def answer_user_question(input): + output = llm_call(input) + await judgment.get_current_trace().async_evaluate( + input=input, + actual_output=output, + retrieval_context=["All customers are eligible for a 30 day full refund at no extra cost."], + expected_output="We offer a 30-day full refund at no extra cost.", + score_type=APIScorer.ANSWER_RELEVANCY, + threshold=0.5, model="gpt-4o-mini", - messages=[ - {"role": "system", "content": "Make a short sentence with the input."}, - {"role": "user", "content": input} - ] + log_results=True ) - openai_result = openai_response.choices[0].message.content - print(openai_result) + return output + +@judgment.observe +async def make_poem(input: str) -> str: + """Generate a poem using both Anthropic and OpenAI APIs. - return make_lower(anthropic_result + openai_result) + Args: + input: The prompt for poem generation + Returns: + Combined and lowercase version of both API responses + """ + try: + # Using Anthropic API + anthropic_response = anthropic_client.messages.create( + model="claude-3-sonnet-20240229", + messages=[{"role": "user", "content": input}], + max_tokens=30 + ) + anthropic_result = anthropic_response.content[0].text + + # Using OpenAI API + openai_response = openai_client.chat.completions.create( + model="gpt-4o-mini", + messages=[ + {"role": "system", "content": "Make a short sentence with the input."}, + {"role": "user", "content": input} + ] + ) + openai_result = openai_response.choices[0].message.content + + return await make_lower(f"{anthropic_result} {openai_result}") + + except Exception as e: + print(f"Error generating poem: {e}") + return "" -def test_evaluation_mixed(input): +async def test_evaluation_mixed(input): with judgment.trace("test_evaluation") as trace: - result = make_poem(make_upper(input)) + upper = await make_upper(input) + result = await make_poem(upper) + await answer_user_question("What if these shoes don't fit?") - trace.print() trace.save() + + trace.print() + return result -result3 = test_evaluation_mixed("hello the world is flat") +if __name__ == "__main__": + # Use a more meaningful test input + test_input = "Write a poem about Nissan R32 GTR" + asyncio.run(test_evaluation_mixed(test_input)) + diff --git a/judgeval/common/tracer.py b/judgeval/common/tracer.py index a4b4464a..5d605d63 100644 --- a/judgeval/common/tracer.py +++ b/judgeval/common/tracer.py @@ -7,43 +7,51 @@ import requests import uuid from contextlib import contextmanager -from typing import Optional, Any, List, Literal, Tuple, Generator -from dataclasses import dataclass +from typing import Optional, Any, List, Literal, Tuple, Generator, TypeAlias, Union +from dataclasses import dataclass, field from datetime import datetime from openai import OpenAI from together import Together from anthropic import Anthropic +from typing import Dict +import inspect +import asyncio +import json +import warnings +from pydantic import BaseModel from judgeval.constants import JUDGMENT_TRACES_SAVE_API_URL +from judgeval.judgment_client import JudgmentClient +from judgeval.data import Example +from judgeval.scorers import JudgmentScorer +from judgeval.data.result import ScoringResult + +# Define type aliases for better code readability and maintainability +ApiClient: TypeAlias = Union[OpenAI, Together, Anthropic] # Supported API clients +TraceEntryType = Literal['enter', 'exit', 'output', 'input', 'evaluation'] # Valid trace entry types @dataclass class TraceEntry: - """ - Represents a single trace entry with its visual representation + """Represents a single trace entry with its visual representation. - Each TraceEntry is a single line in the trace. - The `type` field determines the visual representation of the entry. - - `enter` is for when a function is entered, represented by `→` - - `exit` is for when a function is exited, represented by `←` - - `output` is for when a function outputs a value, represented by `Output:` - - `input` is for function input parameters, represented by `Input:` - - function: Name of the function being traced - depth: Indentation level of this trace entry - message: Additional message to include in the trace - timestamp: Time when this trace entry was created - duration: For 'exit' entries, how long the function took to execute - output: For 'output' entries, the value that was output + Visual representations: + - enter: → (function entry) + - exit: ← (function exit) + - output: Output: (function return value) + - input: Input: (function parameters) + - evaluation: Evaluation: (evaluation results) """ - type: Literal['enter', 'exit', 'output', 'input'] - function: str - depth: int - message: str - timestamp: float - duration: Optional[float] = None - output: Any = None - inputs: dict = None - + type: TraceEntryType + function: str # Name of the function being traced + depth: int # Indentation level for nested calls + message: str # Human-readable description + timestamp: float # Unix timestamp when entry was created + duration: Optional[float] = None # Time taken (for exit/evaluation entries) + output: Any = None # Function output value + # Use field() for mutable defaults to avoid shared state issues + inputs: dict = field(default_factory=dict) + evaluation_result: Optional[List[ScoringResult]] = field(default=None) + def print_entry(self): indent = " " * self.depth if self.type == "enter": @@ -54,25 +62,19 @@ def print_entry(self): print(f"{indent}Output: {self.output}") elif self.type == "input": print(f"{indent}Input: {self.inputs}") + elif self.type == "evaluation": + print(f"{indent}Evaluation: {self.evaluation_result} ({self.duration:.3f}s)") - def to_dict(self): - """Convert the trace entry to a dictionary format""" - # Try to serialize output to check if it's JSON serializable + def to_dict(self) -> dict: + """Convert the trace entry to a dictionary format for storage/transmission.""" try: - # If output is a Pydantic model, serialize it - from pydantic import BaseModel - if isinstance(self.output, BaseModel): - output = self.output.model_dump() - else: - # Test regular JSON serialization - import json - json.dumps(self.output) - output = self.output + output = self._serialize_output() except (TypeError, OverflowError, ValueError): - import warnings + # Handle cases where output cannot be serialized warnings.warn(f"Output for function {self.function} is not JSON serializable. Setting to None.") output = None + # Build a complete dictionary representation of the trace entry return { "type": self.type, "function": self.function, @@ -81,15 +83,31 @@ def to_dict(self): "timestamp": self.timestamp, "duration": self.duration, "output": output, - "inputs": self.inputs if self.inputs else None + "inputs": self.inputs or None, # Convert empty dict to None + "evaluation_result": [result.to_dict() for result in self.evaluation_result] if self.evaluation_result else None } + def _serialize_output(self) -> Any: + """Helper method to serialize output data safely. + + Handles special cases: + - Pydantic models are converted using model_dump() + - Other objects must be JSON serializable + """ + if isinstance(self.output, BaseModel): + return self.output.model_dump() + + # Verify JSON serialization is possible + json.dumps(self.output) + return self.output + class TraceClient: """Client for managing a single trace context""" def __init__(self, tracer, trace_id: str, name: str): self.tracer = tracer self.trace_id = trace_id self.name = name + self.client: JudgmentClient = tracer.client self.entries: List[TraceEntry] = [] self.start_time = time.time() self._current_span = None @@ -128,6 +146,63 @@ def span(self, name: str): duration=duration )) self._current_span = prev_span + + async def async_evaluate( + self, + input: Optional[str] = None, + actual_output: Optional[str] = None, + expected_output: Optional[str] = None, + context: Optional[List[str]] = None, + retrieval_context: Optional[List[str]] = None, + tools_called: Optional[List[str]] = None, + expected_tools: Optional[List[str]] = None, + additional_metadata: Optional[Dict[str, Any]] = None, + score_type: Optional[str] = None, + threshold: Optional[float] = None, + model: Optional[str] = None, + log_results: Optional[bool] = False, + ): + start_time = time.time() # Record start time + example = Example( + input=input, + actual_output=actual_output, + expected_output=expected_output, + context=context, + retrieval_context=retrieval_context, + tools_called=tools_called, + expected_tools=expected_tools, + additional_metadata=additional_metadata, + trace_id=self.trace_id + ) + scorer = JudgmentScorer( + score_type=score_type, + threshold=threshold + ) + _, scoring_results = self.client.run_evaluation( + examples=[example], + scorers=[scorer], + model=model, + metadata={}, + log_results=log_results, + project_name="TestSpanLevel", + eval_run_name="TestSpanLevel", + ) + + self.record_evaluation(scoring_results, start_time) # Pass start_time to record_evaluation + + def record_evaluation(self, results: List[ScoringResult], start_time: float): + """Record evaluation results for the current span""" + if self._current_span: + duration = time.time() - start_time # Calculate duration from start_time + self.add_entry(TraceEntry( + type="evaluation", + function=self._current_span, + depth=self.tracer.depth, + message=f"Evaluation results for {self._current_span}", + timestamp=time.time(), + evaluation_result=results, + duration=duration + )) def record_input(self, inputs: dict): """Record input parameters for the current span""" @@ -141,17 +216,32 @@ def record_input(self, inputs: dict): inputs=inputs )) + async def _update_coroutine_output(self, entry: TraceEntry, coroutine: Any): + """Helper method to update the output of a trace entry once the coroutine completes""" + try: + result = await coroutine + entry.output = result + return result + except Exception as e: + entry.output = f"Error: {str(e)}" + raise + def record_output(self, output: Any): """Record output for the current span""" if self._current_span: - self.add_entry(TraceEntry( + entry = TraceEntry( type="output", function=self._current_span, depth=self.tracer.depth, message=f"Output from {self._current_span}", timestamp=time.time(), - output=output - )) + output="" if inspect.iscoroutine(output) else output + ) + self.add_entry(entry) + + if inspect.iscoroutine(output): + # Create a task to update the output once the coroutine completes + asyncio.create_task(self._update_coroutine_output(entry, output)) def add_entry(self, entry: TraceEntry): """Add a trace entry to this trace context""" @@ -179,6 +269,7 @@ def condense_trace(self, entries: List[dict]) -> List[dict]: - function: function name - inputs: non-None inputs - output: non-None outputs + - evaluation_result: evaluation results - timestamp: first timestamp of the function call """ condensed = [] @@ -194,7 +285,8 @@ def condense_trace(self, entries: List[dict]) -> List[dict]: "function": entry["function"], "timestamp": entry["timestamp"], "inputs": None, - "output": None + "output": None, + "evaluation_result": None } elif entry["type"] == "exit" and entry["function"] == current_func: @@ -214,6 +306,9 @@ def condense_trace(self, entries: List[dict]) -> List[dict]: if entry["type"] == "output" and entry["output"]: current_entry["output"] = entry["output"] + + if entry["type"] == "evaluation" and entry["evaluation_result"]: + current_entry["evaluation_result"] = entry["evaluation_result"] return condensed @@ -263,19 +358,18 @@ def __new__(cls, *args, **kwargs): cls._instance = super(Tracer, cls).__new__(cls) return cls._instance - def __init__(self, api_key: Optional[str] = None): + def __init__(self, api_key: str): if not hasattr(self, 'initialized'): if not api_key: - raise ValueError("Tracer must be configured with an API key first") + raise ValueError("Tracer must be configured with a Judgment API key") + self.api_key = api_key + self.client = JudgmentClient(judgment_api_key=api_key) self.depth = 0 self._current_trace: Optional[TraceClient] = None self.initialized = True - if api_key: - self.api_key = api_key - @contextmanager def trace(self, name: str = None) -> Generator[TraceClient, None, None]: """Start a new trace context using a context manager""" @@ -287,9 +381,17 @@ def trace(self, name: str = None) -> Generator[TraceClient, None, None]: # Automatically create top-level span with trace.span(name or "unnamed_trace") as span: try: + # Save the trace to the database to handle Evaluations' trace_id referential integrity + trace.save() yield trace finally: self._current_trace = prev_trace + + def get_current_trace(self) -> Optional[TraceClient]: + """ + Get the current trace context + """ + return self._current_trace def observe(self, func=None, *, name=None): """ @@ -302,29 +404,52 @@ def observe(self, func=None, *, name=None): if func is None: return lambda f: self.observe(f, name=name) - @functools.wraps(func) - def wrapper(*args, **kwargs): - if self._current_trace: - span_name = name or func.__name__ - - with self._current_trace.span(span_name) as span: - # Record inputs - span.record_input({ - 'args': list(args), - 'kwargs': kwargs - }) - - # Execute function - result = func(*args, **kwargs) + if asyncio.iscoroutinefunction(func): + @functools.wraps(func) + async def async_wrapper(*args, **kwargs): + if self._current_trace: + span_name = name or func.__name__ - # Record output - span.record_output(result) + with self._current_trace.span(span_name) as span: + # Record inputs + span.record_input({ + 'args': list(args), + 'kwargs': kwargs + }) + + # Execute function + result = await func(*args, **kwargs) + + # Record output + span.record_output(result) + + return result + + return await func(*args, **kwargs) + return async_wrapper + else: + @functools.wraps(func) + def wrapper(*args, **kwargs): + if self._current_trace: + span_name = name or func.__name__ - return result - - return func(*args, **kwargs) - - return wrapper + with self._current_trace.span(span_name) as span: + # Record inputs + span.record_input({ + 'args': list(args), + 'kwargs': kwargs + }) + + # Execute function + result = func(*args, **kwargs) + + # Record output + span.record_output(result) + + return result + + return func(*args, **kwargs) + return wrapper def wrap(client: Any) -> Any: """ @@ -333,61 +458,26 @@ def wrap(client: Any) -> Any: """ tracer = Tracer._instance # Get the global tracer instance - if isinstance(client, OpenAI) or isinstance(client, Together): - original_create = client.chat.completions.create - elif isinstance(client, Anthropic): - original_create = client.messages.create - else: - raise ValueError(f"Unsupported client type: {type(client)}") + # Get the appropriate configuration for this client type + span_name, original_create = _get_client_config(client) def traced_create(*args, **kwargs): + # Skip tracing if no active trace if not (tracer and tracer._current_trace): return original_create(*args, **kwargs) - # TODO: this is dangerous and prone to errors in future updates to how the class works. - # If we add more model providers here, we need to add support for it here in the span names - span_name = "OPENAI_API_CALL" if isinstance(client, OpenAI) else "TOGETHER_API_CALL" if isinstance(client, Together) else "ANTHROPIC_API_CALL" with tracer._current_trace.span(span_name) as span: - # Record the input based on client type - if isinstance(client, (OpenAI, Together)): - input_data = { - "model": kwargs.get("model"), - "messages": kwargs.get("messages"), - } - elif isinstance(client, Anthropic): - input_data = { - "model": kwargs.get("model"), - "messages": kwargs.get("messages"), - "max_tokens": kwargs.get("max_tokens") - } - + # Format and record the input parameters + input_data = _format_input_data(client, **kwargs) span.record_input(input_data) - # Make the API call + # Make the actual API call response = original_create(*args, **kwargs) - # Record the output based on client type - if isinstance(client, (OpenAI, Together)): - output_data = { - "content": response.choices[0].message.content, - "usage": { - "prompt_tokens": response.usage.prompt_tokens, - "completion_tokens": response.usage.completion_tokens, - "total_tokens": response.usage.total_tokens - } - } - - elif isinstance(client, Anthropic): - output_data = { - "content": response.content[0].text, - "usage": { - "input_tokens": response.usage.input_tokens, - "output_tokens": response.usage.output_tokens, - "total_tokens": response.usage.input_tokens + response.usage.output_tokens - } - } - + # Format and record the output + output_data = _format_output_data(client, response) span.record_output(output_data) + return response # Replace the original method with our traced version @@ -397,3 +487,75 @@ def traced_create(*args, **kwargs): client.messages.create = traced_create return client + +# Helper functions for client-specific operations + +def _get_client_config(client: ApiClient) -> tuple[str, callable]: + """Returns configuration tuple for the given API client. + + Args: + client: An instance of OpenAI, Together, or Anthropic client + + Returns: + tuple: (span_name, create_method) + - span_name: String identifier for tracing + - create_method: Reference to the client's creation method + + Raises: + ValueError: If client type is not supported + """ + if isinstance(client, OpenAI): + return "OPENAI_API_CALL", client.chat.completions.create + elif isinstance(client, Together): + return "TOGETHER_API_CALL", client.chat.completions.create + elif isinstance(client, Anthropic): + return "ANTHROPIC_API_CALL", client.messages.create + raise ValueError(f"Unsupported client type: {type(client)}") + +def _format_input_data(client: ApiClient, **kwargs) -> dict: + """Format input parameters based on client type. + + Extracts relevant parameters from kwargs based on the client type + to ensure consistent tracing across different APIs. + """ + if isinstance(client, (OpenAI, Together)): + return { + "model": kwargs.get("model"), + "messages": kwargs.get("messages"), + } + # Anthropic requires additional max_tokens parameter + return { + "model": kwargs.get("model"), + "messages": kwargs.get("messages"), + "max_tokens": kwargs.get("max_tokens") + } + +def _format_output_data(client: ApiClient, response: Any) -> dict: + """Format API response data based on client type. + + Normalizes different response formats into a consistent structure + for tracing purposes. + + Returns: + dict containing: + - content: The generated text + - usage: Token usage statistics + """ + if isinstance(client, (OpenAI, Together)): + return { + "content": response.choices[0].message.content, + "usage": { + "prompt_tokens": response.usage.prompt_tokens, + "completion_tokens": response.usage.completion_tokens, + "total_tokens": response.usage.total_tokens + } + } + # Anthropic has a different response structure + return { + "content": response.content[0].text, + "usage": { + "input_tokens": response.usage.input_tokens, + "output_tokens": response.usage.output_tokens, + "total_tokens": response.usage.input_tokens + response.usage.output_tokens + } + } diff --git a/judgeval/run_evaluation.py b/judgeval/run_evaluation.py index 68790327..7bd0d38c 100644 --- a/judgeval/run_evaluation.py +++ b/judgeval/run_evaluation.py @@ -49,7 +49,7 @@ def execute_api_eval(evaluation_run: EvaluationRun) -> List[Dict]: try: # submit API request to execute evals - response = requests.post(JUDGMENT_EVAL_API_URL, json=evaluation_run.model_dump(warnings=True)) + response = requests.post(JUDGMENT_EVAL_API_URL, json=evaluation_run.model_dump(warnings=False)) response_data = response.json() except Exception as e: error(f"Error: {e}") diff --git a/tests/common/test_tracer.py b/tests/common/test_tracer.py index bfe00571..cb6abd61 100644 --- a/tests/common/test_tracer.py +++ b/tests/common/test_tracer.py @@ -2,16 +2,38 @@ import time from unittest.mock import Mock, patch, MagicMock from datetime import datetime +from uuid import uuid4 from openai import OpenAI from together import Together from anthropic import Anthropic +import requests from judgeval.common.tracer import Tracer, TraceEntry, TraceClient, wrap +from judgeval.judgment_client import JudgmentClient +from judgeval.common.exceptions import JudgmentAPIError @pytest.fixture -def tracer(): +def tracer(mocker): """Provide a configured tracer instance""" - return Tracer(api_key="test_api_key") + + # Create the mock response for trace saving (POST) + mock_post_response = mocker.Mock(spec=requests.Response) + mock_post_response.status_code = 200 + mock_post_response.json.return_value = { + "message": "Trace saved successfully", + "trace_id": "test-trace-id" + } + + # Create mocks for POST requests + + mock_post = mocker.patch('requests.post', autospec=True) + mock_post.return_value = mock_post_response + + # Mock the JudgmentClient + mock_judgment_client = mocker.Mock(spec=JudgmentClient) + mocker.patch('judgeval.common.tracer.JudgmentClient', return_value=mock_judgment_client) + + yield Tracer(api_key=str(uuid4())) @pytest.fixture def trace_client(tracer): @@ -19,12 +41,19 @@ def trace_client(tracer): with tracer.trace("test_trace") as client: yield client -def test_tracer_singleton(): +def test_tracer_singleton(mocker): """Test that Tracer maintains singleton pattern""" - tracer1 = Tracer(api_key="test1") - tracer2 = Tracer(api_key="test2") + # Clear any existing singleton instance first + Tracer._instance = None + + # Mock the JudgmentClient + mock_judgment_client = mocker.Mock(spec=JudgmentClient) + mocker.patch('judgeval.common.tracer.JudgmentClient', return_value=mock_judgment_client) + + tracer1 = Tracer(api_key=str(uuid4())) + tracer2 = Tracer(api_key=str(uuid4())) assert tracer1 is tracer2 - assert tracer1.api_key == "test2" # Should have new api_key + assert tracer1.api_key == tracer2.api_key def test_tracer_requires_api_key(): """Test that Tracer requires an API key""" @@ -219,3 +248,21 @@ class UnsupportedClient: with pytest.raises(ValueError): wrap(UnsupportedClient()) + +def test_tracer_invalid_api_key(mocker): + """Test that Tracer handles invalid API keys""" + # Clear the singleton instance first + Tracer._instance = None + + # Create the mock response for invalid API key + mock_post_response = mocker.Mock(spec=requests.Response) + mock_post_response.status_code = 401 + mock_post_response.json.return_value = {"detail": "API key is invalid"} + mock_post_response.raise_for_status.side_effect = requests.exceptions.HTTPError() + + # Create mock for POST request + mock_post = mocker.patch('requests.post', autospec=True) + mock_post.return_value = mock_post_response + + with pytest.raises(JudgmentAPIError, match="Issue with passed in Judgment API key: API key is invalid"): + Tracer(api_key="invalid_key")