16
16
from judgeval .data .evaluation_run import EvaluationRun
17
17
from judgeval .utils .async_utils import safe_run_async
18
18
from judgeval .scorers .score import a_execute_scoring
19
+ from judgeval .api import JudgmentSyncClient
20
+ from judgeval .env import JUDGMENT_API_KEY , JUDGMENT_ORG_ID
19
21
20
22
21
23
class LocalEvaluationQueue :
@@ -37,6 +39,10 @@ def __init__(
37
39
self ._num_workers = num_workers # Number of worker threads
38
40
self ._worker_threads : List [threading .Thread ] = []
39
41
self ._shutdown_event = threading .Event ()
42
+ self ._api_client = JudgmentSyncClient (
43
+ api_key = JUDGMENT_API_KEY ,
44
+ organization_id = JUDGMENT_ORG_ID ,
45
+ )
40
46
41
47
def enqueue (self , evaluation_run : EvaluationRun ) -> None :
42
48
"""Add evaluation run to the queue."""
@@ -83,13 +89,8 @@ def run_all(
83
89
84
90
def start_workers (
85
91
self ,
86
- callback : Optional [Callable [[EvaluationRun , List [ScoringResult ]], None ]] = None ,
87
92
) -> List [threading .Thread ]:
88
93
"""Start multiple background threads to process runs in parallel.
89
-
90
- Args:
91
- callback: Optional function called after each run with (run, results).
92
-
93
94
Returns:
94
95
List of started worker threads.
95
96
"""
@@ -107,8 +108,10 @@ def _worker(worker_id: int) -> None:
107
108
108
109
try :
109
110
results = self ._process_run (run )
110
- if callback :
111
- callback (run , results )
111
+ results_dict = [result .model_dump () for result in results ]
112
+ self ._api_client .log_eval_results (
113
+ payload = {"results" : results_dict , "run" : run .model_dump ()}
114
+ )
112
115
except Exception as exc :
113
116
judgeval_logger .error (
114
117
f"Worker { worker_id } error processing { run .eval_name } : { exc } "
0 commit comments