fix: make consolidation API async with background task + increased timeout
Three fixes for consolidation reliability:
1. Fire-and-forget API: POST /memory/consolidate now launches consolidation
as an asyncio background task and returns immediately. The old approach
blocked until Cat's WS response, which could take 5+ minutes (LLM
extraction calls), exceeding both the WS timeout and browser fetch
timeout. Web UI now polls /memory/status to track completion.
2. Increased timeout: cat_client.trigger_consolidation() timeout raised
from 300s to 600s (configurable via parameter). Logs unexpected WS
message types for debugging.
3. Better logging: Consolidation log messages prefixed with 🌙 for
grep-friendliness. cat_client errors include exc_info=True for
traceback visibility. Web UI shows elapsed time while polling.
This commit is contained in:
@@ -1,5 +1,8 @@
|
|||||||
"""Cheshire Cat memory management routes."""
|
"""Cheshire Cat memory management routes."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import time
|
||||||
|
from datetime import datetime
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from fastapi import APIRouter, Form
|
from fastapi import APIRouter, Form
|
||||||
from fastapi.responses import JSONResponse
|
from fastapi.responses import JSONResponse
|
||||||
@@ -88,13 +91,68 @@ async def get_episodic_memories():
|
|||||||
|
|
||||||
@router.post("/memory/consolidate")
|
@router.post("/memory/consolidate")
|
||||||
async def trigger_memory_consolidation():
|
async def trigger_memory_consolidation():
|
||||||
"""Manually trigger memory consolidation (sleep consolidation process)."""
|
"""
|
||||||
|
Trigger memory consolidation as a background task.
|
||||||
|
|
||||||
|
Returns immediately — the Web UI should poll /memory/status
|
||||||
|
to see when consolidation completes and view the result.
|
||||||
|
"""
|
||||||
from utils.cat_client import cat_adapter
|
from utils.cat_client import cat_adapter
|
||||||
logger.info("🌙 Manual memory consolidation triggered via API")
|
from utils.consolidation_scheduler import get_consolidation_status
|
||||||
result = await cat_adapter.trigger_consolidation()
|
|
||||||
if result is None:
|
# Check if already running
|
||||||
return JSONResponse(status_code=500, content={"success": False, "error": "Consolidation failed or timed out"})
|
status = get_consolidation_status()
|
||||||
return {"success": True, "result": result}
|
if status.get('is_running'):
|
||||||
|
return {"success": True, "message": "Consolidation is already running", "status": status}
|
||||||
|
|
||||||
|
logger.info("🌙 Manual memory consolidation triggered via API (background)...")
|
||||||
|
|
||||||
|
# Launch consolidation as a background task so the API returns immediately.
|
||||||
|
# The result is tracked via consolidation_scheduler's _last_consolidation state.
|
||||||
|
asyncio.create_task(_run_consolidation_background())
|
||||||
|
|
||||||
|
return {"success": True, "message": "Consolidation started in background. Check status via /memory/status"}
|
||||||
|
|
||||||
|
|
||||||
|
async def _run_consolidation_background():
|
||||||
|
"""
|
||||||
|
Run consolidation as a background task, updating the scheduler state.
|
||||||
|
This prevents the API from blocking for minutes.
|
||||||
|
"""
|
||||||
|
from utils.cat_client import cat_adapter
|
||||||
|
from utils.consolidation_scheduler import _last_consolidation
|
||||||
|
|
||||||
|
_last_consolidation['is_running'] = True
|
||||||
|
_last_consolidation['last_run'] = datetime.now().isoformat()
|
||||||
|
_last_consolidation['total_runs'] += 1
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Wait briefly for Cat to be ready if it was just started
|
||||||
|
if not await cat_adapter.health_check():
|
||||||
|
_last_consolidation['last_error'] = 'Cat health check failed'
|
||||||
|
_last_consolidation['is_running'] = False
|
||||||
|
return
|
||||||
|
|
||||||
|
result = await cat_adapter.trigger_consolidation(timeout=600)
|
||||||
|
elapsed = time.time() - start_time
|
||||||
|
|
||||||
|
if result:
|
||||||
|
logger.info(f"🌙 Manual consolidation completed in {elapsed:.1f}s: {result[:200]}")
|
||||||
|
_last_consolidation['last_result'] = result
|
||||||
|
_last_consolidation['last_error'] = None
|
||||||
|
_last_consolidation['successful_runs'] += 1
|
||||||
|
else:
|
||||||
|
logger.error(f"🌙 Manual consolidation returned no result after {elapsed:.1f}s")
|
||||||
|
_last_consolidation['last_error'] = f'No result returned after {elapsed:.1f}s (timeout or connection error)'
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
elapsed = time.time() - start_time
|
||||||
|
logger.error(f"🌙 Manual consolidation failed after {elapsed:.1f}s: {e}")
|
||||||
|
_last_consolidation['last_error'] = str(e)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
_last_consolidation['is_running'] = False
|
||||||
|
|
||||||
|
|
||||||
@router.post("/memory/delete")
|
@router.post("/memory/delete")
|
||||||
|
|||||||
@@ -96,18 +96,54 @@ async function triggerConsolidation() {
|
|||||||
btn.disabled = true;
|
btn.disabled = true;
|
||||||
btn.textContent = '⏳ Running...';
|
btn.textContent = '⏳ Running...';
|
||||||
status.textContent = 'Consolidation in progress (this may take a few minutes)...';
|
status.textContent = 'Consolidation in progress (this may take a few minutes)...';
|
||||||
|
status.style.color = '#dcb06f';
|
||||||
resultDiv.style.display = 'none';
|
resultDiv.style.display = 'none';
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const data = await apiCall('/memory/consolidate', 'POST');
|
const data = await apiCall('/memory/consolidate', 'POST');
|
||||||
|
|
||||||
if (data.success) {
|
if (data.success) {
|
||||||
status.textContent = '✅ Consolidation complete!';
|
status.textContent = '⏳ Consolidation started — waiting for completion...';
|
||||||
status.style.color = '#6fdc6f';
|
|
||||||
resultDiv.textContent = data.result || 'Consolidation finished successfully.';
|
// Poll /memory/status until consolidation finishes
|
||||||
resultDiv.style.display = 'block';
|
const pollInterval = 5000; // 5 seconds
|
||||||
showNotification('Memory consolidation complete', 'success');
|
const maxPolls = 120; // 10 minutes max
|
||||||
refreshMemoryStats();
|
|
||||||
|
for (let i = 0; i < maxPolls; i++) {
|
||||||
|
await new Promise(r => setTimeout(r, pollInterval));
|
||||||
|
|
||||||
|
const statusData = await apiCall('/memory/status');
|
||||||
|
const cons = statusData.consolidation;
|
||||||
|
|
||||||
|
if (!cons.is_running) {
|
||||||
|
// Consolidation finished
|
||||||
|
if (cons.last_error) {
|
||||||
|
status.textContent = '❌ ' + cons.last_error;
|
||||||
|
status.style.color = '#ff6b6b';
|
||||||
|
resultDiv.textContent = 'Error: ' + cons.last_error;
|
||||||
|
resultDiv.style.display = 'block';
|
||||||
|
showNotification('Consolidation failed: ' + cons.last_error, 'error');
|
||||||
|
} else {
|
||||||
|
status.textContent = '✅ Consolidation complete!';
|
||||||
|
status.style.color = '#6fdc6f';
|
||||||
|
resultDiv.textContent = cons.last_result || 'Consolidation finished successfully.';
|
||||||
|
resultDiv.style.display = 'block';
|
||||||
|
showNotification('Memory consolidation complete', 'success');
|
||||||
|
}
|
||||||
|
refreshMemoryStats();
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
// Still running — update status message
|
||||||
|
status.textContent = `⏳ Consolidation still running... (${Math.round((i + 1) * pollInterval / 1000)}s elapsed)`;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we exited the loop without finishing
|
||||||
|
const finalStatus = await apiCall('/memory/status');
|
||||||
|
if (finalStatus.consolidation?.is_running) {
|
||||||
|
status.textContent = '⏳ Consolidation still running — check back later';
|
||||||
|
status.style.color = '#dcb06f';
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
status.textContent = '❌ ' + (data.error || 'Consolidation failed');
|
status.textContent = '❌ ' + (data.error || 'Consolidation failed');
|
||||||
status.style.color = '#ff6b6b';
|
status.style.color = '#ff6b6b';
|
||||||
|
|||||||
@@ -577,46 +577,51 @@ class CatAdapter:
|
|||||||
logger.error(f"Error clearing conversation history: {e}")
|
logger.error(f"Error clearing conversation history: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
async def trigger_consolidation(self) -> Optional[str]:
|
async def trigger_consolidation(self, timeout: int = 600) -> Optional[str]:
|
||||||
"""
|
"""
|
||||||
Trigger memory consolidation by sending a special message via WebSocket.
|
Trigger memory consolidation by sending a special message via WebSocket.
|
||||||
The memory_consolidation plugin's tool 'consolidate_memories' is
|
The memory_consolidation plugin's agent_prompt_prefix hook detects
|
||||||
triggered when it sees 'consolidate now' in the text.
|
'consolidate now' in the text and runs the consolidation synchronously.
|
||||||
Uses WebSocket with a system user ID for proper context.
|
Uses WebSocket with a system user ID for proper context.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
timeout: Max seconds to wait for the consolidation response.
|
||||||
|
Default 600 (10 min) as consolidation + LLM call can be slow.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
ws_base = self._base_url.replace("http://", "ws://").replace("https://", "wss://")
|
ws_base = self._base_url.replace("http://", "ws://").replace("https://", "wss://")
|
||||||
ws_url = f"{ws_base}/ws/system_consolidation"
|
ws_url = f"{ws_base}/ws/system_consolidation"
|
||||||
|
|
||||||
logger.info("🌙 Triggering memory consolidation via WS...")
|
logger.info(f"🌙 Triggering memory consolidation via WS (timeout={timeout}s)...")
|
||||||
|
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
async with session.ws_connect(
|
async with session.ws_connect(
|
||||||
ws_url,
|
ws_url,
|
||||||
timeout=300, # Consolidation can be very slow
|
timeout=timeout,
|
||||||
) as ws:
|
) as ws:
|
||||||
await ws.send_json({"text": "consolidate now"})
|
await ws.send_json({"text": "consolidate now"})
|
||||||
|
|
||||||
# Wait for the final chat response
|
# Wait for the final chat response
|
||||||
deadline = asyncio.get_event_loop().time() + 300
|
deadline = asyncio.get_event_loop().time() + timeout
|
||||||
|
last_type = ""
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
remaining = deadline - asyncio.get_event_loop().time()
|
remaining = deadline - asyncio.get_event_loop().time()
|
||||||
if remaining <= 0:
|
if remaining <= 0:
|
||||||
logger.error("Consolidation timed out (>300s)")
|
logger.error(f"🌙 Consolidation timed out (>{timeout}s)")
|
||||||
return "Consolidation timed out"
|
return "Consolidation timed out"
|
||||||
|
|
||||||
try:
|
try:
|
||||||
ws_msg = await asyncio.wait_for(
|
ws_msg = await asyncio.wait_for(
|
||||||
ws.receive(),
|
ws.receive(),
|
||||||
timeout=remaining
|
timeout=max(1.0, remaining)
|
||||||
)
|
)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
logger.error("Consolidation WS receive timeout")
|
logger.error("🌙 Consolidation WS receive timeout")
|
||||||
return "Consolidation timed out waiting for response"
|
return "Consolidation timed out waiting for response"
|
||||||
|
|
||||||
if ws_msg.type in (aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING, aiohttp.WSMsgType.CLOSED):
|
if ws_msg.type in (aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING, aiohttp.WSMsgType.CLOSED):
|
||||||
logger.warning("Consolidation WS closed by server")
|
logger.warning("🌙 Consolidation WS closed by server")
|
||||||
return "Connection closed during consolidation"
|
return "Connection closed during consolidation"
|
||||||
if ws_msg.type == aiohttp.WSMsgType.ERROR:
|
if ws_msg.type == aiohttp.WSMsgType.ERROR:
|
||||||
return f"WebSocket error: {ws.exception()}"
|
return f"WebSocket error: {ws.exception()}"
|
||||||
@@ -631,20 +636,24 @@ class CatAdapter:
|
|||||||
msg_type = msg.get("type", "")
|
msg_type = msg.get("type", "")
|
||||||
if msg_type == "chat":
|
if msg_type == "chat":
|
||||||
reply = msg.get("content") or msg.get("text", "")
|
reply = msg.get("content") or msg.get("text", "")
|
||||||
logger.info(f"Consolidation result: {reply[:200]}")
|
logger.info(f"🌙 Consolidation result: {reply[:200]}")
|
||||||
return reply
|
return reply
|
||||||
elif msg_type == "error":
|
elif msg_type == "error":
|
||||||
error_desc = msg.get("description", "Unknown error")
|
error_desc = msg.get("description", "Unknown error")
|
||||||
logger.error(f"Consolidation error: {error_desc}")
|
logger.error(f"🌙 Consolidation error: {error_desc}")
|
||||||
return f"Consolidation error: {error_desc}"
|
return f"Consolidation error: {error_desc}"
|
||||||
else:
|
else:
|
||||||
|
# Log unexpected message types for debugging
|
||||||
|
if msg_type != last_type:
|
||||||
|
logger.debug(f"🌙 Consolidation WS msg type: {msg_type}")
|
||||||
|
last_type = msg_type
|
||||||
continue
|
continue
|
||||||
|
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
logger.error("Consolidation WS connection timed out")
|
logger.error("🌙 Consolidation WS connection timed out")
|
||||||
return None
|
return None
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Consolidation error: {e}")
|
logger.error(f"🌙 Consolidation error: {e}", exc_info=True)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# ====================================================================
|
# ====================================================================
|
||||||
|
|||||||
Reference in New Issue
Block a user