diff --git a/bot/routes/memory.py b/bot/routes/memory.py index 83a6ca3..af7e65d 100644 --- a/bot/routes/memory.py +++ b/bot/routes/memory.py @@ -1,5 +1,8 @@ """Cheshire Cat memory management routes.""" +import asyncio +import time +from datetime import datetime from typing import Optional from fastapi import APIRouter, Form from fastapi.responses import JSONResponse @@ -88,13 +91,68 @@ async def get_episodic_memories(): @router.post("/memory/consolidate") async def trigger_memory_consolidation(): - """Manually trigger memory consolidation (sleep consolidation process).""" + """ + Trigger memory consolidation as a background task. + + Returns immediately — the Web UI should poll /memory/status + to see when consolidation completes and view the result. + """ from utils.cat_client import cat_adapter - logger.info("🌙 Manual memory consolidation triggered via API") - result = await cat_adapter.trigger_consolidation() - if result is None: - return JSONResponse(status_code=500, content={"success": False, "error": "Consolidation failed or timed out"}) - return {"success": True, "result": result} + from utils.consolidation_scheduler import get_consolidation_status + + # Check if already running + status = get_consolidation_status() + if status.get('is_running'): + return {"success": True, "message": "Consolidation is already running", "status": status} + + logger.info("🌙 Manual memory consolidation triggered via API (background)...") + + # Launch consolidation as a background task so the API returns immediately. + # The result is tracked via consolidation_scheduler's _last_consolidation state. + asyncio.create_task(_run_consolidation_background()) + + return {"success": True, "message": "Consolidation started in background. Check status via /memory/status"} + + +async def _run_consolidation_background(): + """ + Run consolidation as a background task, updating the scheduler state. + This prevents the API from blocking for minutes. + """ + from utils.cat_client import cat_adapter + from utils.consolidation_scheduler import _last_consolidation + + _last_consolidation['is_running'] = True + _last_consolidation['last_run'] = datetime.now().isoformat() + _last_consolidation['total_runs'] += 1 + start_time = time.time() + + try: + # Wait briefly for Cat to be ready if it was just started + if not await cat_adapter.health_check(): + _last_consolidation['last_error'] = 'Cat health check failed' + _last_consolidation['is_running'] = False + return + + result = await cat_adapter.trigger_consolidation(timeout=600) + elapsed = time.time() - start_time + + if result: + logger.info(f"🌙 Manual consolidation completed in {elapsed:.1f}s: {result[:200]}") + _last_consolidation['last_result'] = result + _last_consolidation['last_error'] = None + _last_consolidation['successful_runs'] += 1 + else: + logger.error(f"🌙 Manual consolidation returned no result after {elapsed:.1f}s") + _last_consolidation['last_error'] = f'No result returned after {elapsed:.1f}s (timeout or connection error)' + + except Exception as e: + elapsed = time.time() - start_time + logger.error(f"🌙 Manual consolidation failed after {elapsed:.1f}s: {e}") + _last_consolidation['last_error'] = str(e) + + finally: + _last_consolidation['is_running'] = False @router.post("/memory/delete") diff --git a/bot/static/js/memories.js b/bot/static/js/memories.js index 39732c3..0198ba6 100644 --- a/bot/static/js/memories.js +++ b/bot/static/js/memories.js @@ -96,18 +96,54 @@ async function triggerConsolidation() { btn.disabled = true; btn.textContent = '⏳ Running...'; status.textContent = 'Consolidation in progress (this may take a few minutes)...'; + status.style.color = '#dcb06f'; resultDiv.style.display = 'none'; try { const data = await apiCall('/memory/consolidate', 'POST'); if (data.success) { - status.textContent = '✅ Consolidation complete!'; - status.style.color = '#6fdc6f'; - resultDiv.textContent = data.result || 'Consolidation finished successfully.'; - resultDiv.style.display = 'block'; - showNotification('Memory consolidation complete', 'success'); - refreshMemoryStats(); + status.textContent = '⏳ Consolidation started — waiting for completion...'; + + // Poll /memory/status until consolidation finishes + const pollInterval = 5000; // 5 seconds + const maxPolls = 120; // 10 minutes max + + for (let i = 0; i < maxPolls; i++) { + await new Promise(r => setTimeout(r, pollInterval)); + + const statusData = await apiCall('/memory/status'); + const cons = statusData.consolidation; + + if (!cons.is_running) { + // Consolidation finished + if (cons.last_error) { + status.textContent = '❌ ' + cons.last_error; + status.style.color = '#ff6b6b'; + resultDiv.textContent = 'Error: ' + cons.last_error; + resultDiv.style.display = 'block'; + showNotification('Consolidation failed: ' + cons.last_error, 'error'); + } else { + status.textContent = '✅ Consolidation complete!'; + status.style.color = '#6fdc6f'; + resultDiv.textContent = cons.last_result || 'Consolidation finished successfully.'; + resultDiv.style.display = 'block'; + showNotification('Memory consolidation complete', 'success'); + } + refreshMemoryStats(); + break; + } else { + // Still running — update status message + status.textContent = `⏳ Consolidation still running... (${Math.round((i + 1) * pollInterval / 1000)}s elapsed)`; + } + } + + // If we exited the loop without finishing + const finalStatus = await apiCall('/memory/status'); + if (finalStatus.consolidation?.is_running) { + status.textContent = '⏳ Consolidation still running — check back later'; + status.style.color = '#dcb06f'; + } } else { status.textContent = '❌ ' + (data.error || 'Consolidation failed'); status.style.color = '#ff6b6b'; diff --git a/bot/utils/cat_client.py b/bot/utils/cat_client.py index b0d297b..2a0d57d 100644 --- a/bot/utils/cat_client.py +++ b/bot/utils/cat_client.py @@ -577,46 +577,51 @@ class CatAdapter: logger.error(f"Error clearing conversation history: {e}") return False - async def trigger_consolidation(self) -> Optional[str]: + async def trigger_consolidation(self, timeout: int = 600) -> Optional[str]: """ Trigger memory consolidation by sending a special message via WebSocket. - The memory_consolidation plugin's tool 'consolidate_memories' is - triggered when it sees 'consolidate now' in the text. + The memory_consolidation plugin's agent_prompt_prefix hook detects + 'consolidate now' in the text and runs the consolidation synchronously. Uses WebSocket with a system user ID for proper context. + + Args: + timeout: Max seconds to wait for the consolidation response. + Default 600 (10 min) as consolidation + LLM call can be slow. """ try: ws_base = self._base_url.replace("http://", "ws://").replace("https://", "wss://") ws_url = f"{ws_base}/ws/system_consolidation" - logger.info("🌙 Triggering memory consolidation via WS...") + logger.info(f"🌙 Triggering memory consolidation via WS (timeout={timeout}s)...") async with aiohttp.ClientSession() as session: async with session.ws_connect( ws_url, - timeout=300, # Consolidation can be very slow + timeout=timeout, ) as ws: await ws.send_json({"text": "consolidate now"}) # Wait for the final chat response - deadline = asyncio.get_event_loop().time() + 300 + deadline = asyncio.get_event_loop().time() + timeout + last_type = "" while True: remaining = deadline - asyncio.get_event_loop().time() if remaining <= 0: - logger.error("Consolidation timed out (>300s)") + logger.error(f"🌙 Consolidation timed out (>{timeout}s)") return "Consolidation timed out" try: ws_msg = await asyncio.wait_for( ws.receive(), - timeout=remaining + timeout=max(1.0, remaining) ) except asyncio.TimeoutError: - logger.error("Consolidation WS receive timeout") + logger.error("🌙 Consolidation WS receive timeout") return "Consolidation timed out waiting for response" if ws_msg.type in (aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING, aiohttp.WSMsgType.CLOSED): - logger.warning("Consolidation WS closed by server") + logger.warning("🌙 Consolidation WS closed by server") return "Connection closed during consolidation" if ws_msg.type == aiohttp.WSMsgType.ERROR: return f"WebSocket error: {ws.exception()}" @@ -631,20 +636,24 @@ class CatAdapter: msg_type = msg.get("type", "") if msg_type == "chat": reply = msg.get("content") or msg.get("text", "") - logger.info(f"Consolidation result: {reply[:200]}") + logger.info(f"🌙 Consolidation result: {reply[:200]}") return reply elif msg_type == "error": error_desc = msg.get("description", "Unknown error") - logger.error(f"Consolidation error: {error_desc}") + logger.error(f"🌙 Consolidation error: {error_desc}") return f"Consolidation error: {error_desc}" else: + # Log unexpected message types for debugging + if msg_type != last_type: + logger.debug(f"🌙 Consolidation WS msg type: {msg_type}") + last_type = msg_type continue except asyncio.TimeoutError: - logger.error("Consolidation WS connection timed out") + logger.error("🌙 Consolidation WS connection timed out") return None except Exception as e: - logger.error(f"Consolidation error: {e}") + logger.error(f"🌙 Consolidation error: {e}", exc_info=True) return None # ====================================================================