@@ -313,10 +313,14 @@ def export_conversation(self, session_id: str) -> Optional[Dict[str, Any]]:
313
313
return conversation .model_dump ()
314
314
315
315
def _load_conversation_from_db (self , session_id : str ) -> Optional [Dict [str , Any ]]:
316
- """Load conversation from database (synchronous wrapper)"""
316
+ """Load conversation from normalized database (synchronous wrapper)"""
317
317
try :
318
318
import asyncio
319
- from app .database .accessor .conversation import load_conversation
319
+ from app .database .accessor .conversation import load_full_conversation
320
+
321
+ # Use fallback values for synchronous call
322
+ user_id = "unknown_user"
323
+ merchant_id = "unknown_merchant"
320
324
321
325
# Get the current event loop or create one if needed
322
326
try :
@@ -326,61 +330,164 @@ def _load_conversation_from_db(self, session_id: str) -> Optional[Dict[str, Any]
326
330
# This is a fallback for synchronous calls from async contexts
327
331
return None
328
332
else :
329
- return loop .run_until_complete (load_conversation (session_id ))
333
+ return loop .run_until_complete (load_full_conversation (session_id , user_id , merchant_id ))
330
334
except RuntimeError :
331
335
# No event loop in current thread, create a new one
332
- return asyncio .run (load_conversation (session_id ))
336
+ return asyncio .run (load_full_conversation (session_id , user_id , merchant_id ))
333
337
334
338
except Exception as e :
335
- logger .error (f"Failed to load conversation from database for session { session_id } : { e } " )
339
+ logger .error (f"Failed to load conversation from normalized database for session { session_id } : { e } " )
336
340
return None
337
341
338
- async def _save_conversation_to_db (self , conversation : 'ConversationDebugData' ):
339
- """Save conversation to database"""
342
+ async def _save_conversation_to_db (self , conversation : 'ConversationDebugData' , user_id : str = None , merchant_id : str = None ):
343
+ """Save conversation to normalized database structure with user authorization """
340
344
try :
341
- from app .database .accessor .conversation import save_conversation
342
-
343
- conversation .update_summary ()
344
- conversation_data = conversation .model_dump ()
345
-
346
- success = await save_conversation (
347
- session_id = conversation .session_id ,
348
- conversation_id = conversation .conversation_id ,
349
- conversation_data = conversation_data
345
+ from app .database .accessor .conversation import (
346
+ create_conversation , get_conversation_by_session , save_message , save_tool_call
350
347
)
351
348
352
- if success :
353
- logger .debug (f"Saved conversation { conversation .session_id } to database" )
349
+ # Use fallback values if user context not provided
350
+ if not user_id :
351
+ user_id = "unknown_user" # This should be improved to get from session context
352
+ if not merchant_id :
353
+ merchant_id = "unknown_merchant" # This should be improved to get from session context
354
+
355
+ # Check if conversation already exists in database
356
+ existing_conv = await get_conversation_by_session (conversation .session_id , user_id , merchant_id )
357
+
358
+ if not existing_conv :
359
+ # Create new conversation record
360
+ conversation_uuid = await create_conversation (
361
+ session_id = conversation .session_id ,
362
+ user_id = user_id ,
363
+ merchant_id = merchant_id ,
364
+ title = conversation .metadata .get ('title' ),
365
+ metadata = conversation .metadata
366
+ )
367
+ if not conversation_uuid :
368
+ logger .error (f"Failed to create conversation record for { conversation .session_id } " )
369
+ return
354
370
else :
355
- logger .warning (f"Failed to save conversation { conversation .session_id } to database" )
371
+ conversation_uuid = existing_conv ['id' ]
372
+
373
+ # Save all messages and tool calls for each turn
374
+ for turn in conversation .turns :
375
+ if not turn .status == "completed" :
376
+ continue # Only save completed turns
377
+
378
+ # Save user message if exists
379
+ if turn .user_message :
380
+ user_msg_uuid = await save_message (
381
+ conversation_id = conversation_uuid ,
382
+ turn_number = turn .turn_number ,
383
+ role = "user" ,
384
+ content = turn .user_message .content ,
385
+ metadata = {"message_id" : turn .user_message .id , "timestamp" : turn .user_message .timestamp .isoformat ()}
386
+ )
387
+
388
+ # Save assistant message if exists
389
+ if turn .assistant_response :
390
+ assistant_msg_uuid = await save_message (
391
+ conversation_id = conversation_uuid ,
392
+ turn_number = turn .turn_number ,
393
+ role = "assistant" ,
394
+ content = turn .assistant_response .content ,
395
+ metadata = {"message_id" : turn .assistant_response .id , "timestamp" : turn .assistant_response .timestamp .isoformat ()}
396
+ )
397
+
398
+ # Save tool calls for this assistant message
399
+ for tool_call in turn .tool_calls :
400
+ if assistant_msg_uuid :
401
+ tool_call_uuid = await save_tool_call (
402
+ message_id = assistant_msg_uuid ,
403
+ tool_call_id = tool_call .id ,
404
+ function_name = tool_call .function_name ,
405
+ arguments = tool_call .arguments
406
+ )
407
+
408
+ logger .debug (f"Saved conversation { conversation .session_id } to normalized database (user: { user_id } , merchant: { merchant_id } )" )
356
409
357
410
except Exception as e :
358
- logger .error (f"Error saving conversation to database: { e } " )
411
+ logger .error (f"Error saving conversation to normalized database: { e } " )
359
412
360
- async def load_conversation_from_db (self , session_id : str ) -> Optional ['ConversationDebugData' ]:
361
- """Load conversation from database and restore to memory"""
413
+ async def load_conversation_from_db (self , session_id : str , user_id : str = None , merchant_id : str = None ) -> Optional ['ConversationDebugData' ]:
414
+ """Load conversation from normalized database structure with user authorization and restore to memory"""
362
415
try :
363
- from app .database .accessor .conversation import load_conversation
364
- from .services .conversation import ConversationDebugData
416
+ from app .database .accessor .conversation import load_full_conversation
417
+ from .services .conversation import ConversationDebugData , ConversationTurn , ConversationMessage , ToolCall
418
+
419
+ # Use fallback values if user context not provided
420
+ if not user_id :
421
+ user_id = "unknown_user" # This should be improved to get from session context
422
+ if not merchant_id :
423
+ merchant_id = "unknown_merchant" # This should be improved to get from session context
365
424
366
- conversation_data = await load_conversation (session_id )
425
+ conversation_data = await load_full_conversation (session_id , user_id , merchant_id )
367
426
if not conversation_data :
368
- logger .debug (f"No conversation found in database for session { session_id } " )
427
+ logger .debug (f"No authorized conversation found in database for session { session_id } (user: { user_id } , merchant: { merchant_id } ) " )
369
428
return None
370
429
371
- # Reconstruct ConversationDebugData from stored data
372
- conversation = ConversationDebugData .model_validate (conversation_data )
430
+ # Reconstruct ConversationDebugData from normalized database structure
431
+ conversation = ConversationDebugData (
432
+ session_id = session_id ,
433
+ conversation_id = conversation_data ['id' ],
434
+ metadata = {'loaded_from_database' : True }
435
+ )
436
+
437
+ # Reconstruct turns from the normalized data
438
+ for turn_data in conversation_data .get ('turns' , []):
439
+ turn_messages = turn_data ['messages' ]
440
+ turn_number = turn_data ['turn_number' ]
441
+
442
+ # Find user and assistant messages
443
+ user_message = None
444
+ assistant_message = None
445
+ tool_calls = []
446
+
447
+ for msg in turn_messages :
448
+ if msg ['role' ] == 'user' :
449
+ user_message = ConversationMessage (
450
+ id = msg .get ('id' , f"user_{ turn_number } " ),
451
+ role = 'user' ,
452
+ content = msg ['content' ],
453
+ timestamp = msg ['timestamp' ]
454
+ )
455
+ elif msg ['role' ] == 'assistant' :
456
+ assistant_message = ConversationMessage (
457
+ id = msg .get ('id' , f"assistant_{ turn_number } " ),
458
+ role = 'assistant' ,
459
+ content = msg ['content' ],
460
+ timestamp = msg ['timestamp' ]
461
+ )
462
+
463
+ # Add tool calls for this assistant message
464
+ for tool_call_data in msg .get ('tool_calls' , []):
465
+ tool_call = ToolCall (
466
+ id = tool_call_data ['tool_call_id' ],
467
+ function_name = tool_call_data ['function_name' ],
468
+ arguments = tool_call_data ['arguments' ]
469
+ )
470
+ tool_calls .append (tool_call )
471
+
472
+ # Create and add the turn
473
+ if user_message or assistant_message :
474
+ turn = conversation .start_new_turn (user_message )
475
+ turn .turn_number = turn_number
476
+ if assistant_message :
477
+ turn .assistant_response = assistant_message
478
+ turn .tool_calls = tool_calls
479
+ turn .complete_turn ("completed" )
373
480
374
481
# Store in memory cache
375
482
with self ._lock :
376
483
self ._conversations [session_id ] = conversation
377
484
self ._session_access_times [session_id ] = time .time ()
378
485
379
- logger .info (f"Loaded conversation { session_id } from database with { len (conversation .turns )} turns" )
486
+ logger .info (f"Loaded authorized conversation { session_id } from normalized database with { len (conversation .turns )} turns (user: { user_id } , merchant: { merchant_id } ) " )
380
487
return conversation
381
488
382
489
except Exception as e :
383
- logger .error (f"Failed to load conversation from database for session { session_id } : { e } " )
490
+ logger .error (f"Failed to load authorized conversation from normalized database for session { session_id } : { e } " )
384
491
return None
385
492
386
493
def get_session_stats (self ) -> Dict [str , Any ]:
0 commit comments