- 217 error returns across 18 route files + api.py now use JSONResponse with appropriate HTTP status codes instead of returning HTTP 200 - Status code distribution: 500 (121), 400 (39), 503 (28), 404 (24), 409 (3), 502 (2) - Fixed language.py tuple-return bug (was serializing as JSON array) - Fixed bare except clauses in bipolar_mode.py and voice.py - Body-level error schemas preserved (status/error + success/error patterns) so web UI continues working without changes - chat.py (SSE) unchanged: errors sent within stream protocol - All 170 tests pass
195 lines
7.2 KiB
Python
195 lines
7.2 KiB
Python
"""Cheshire Cat memory management routes."""
|
|
|
|
from typing import Optional
|
|
from fastapi import APIRouter, Form
|
|
from fastapi.responses import JSONResponse
|
|
import globals
|
|
from routes.models import MemoryDeleteRequest, MemoryEditRequest, MemoryCreateRequest
|
|
from utils.logger import get_logger
|
|
|
|
logger = get_logger('api')
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("/memory/status")
|
|
async def get_cat_memory_status():
|
|
"""Get Cheshire Cat connection status and feature flag."""
|
|
from utils.cat_client import cat_adapter
|
|
is_healthy = await cat_adapter.health_check()
|
|
return {
|
|
"enabled": globals.USE_CHESHIRE_CAT,
|
|
"healthy": is_healthy,
|
|
"url": globals.CHESHIRE_CAT_URL,
|
|
"circuit_breaker_active": cat_adapter._is_circuit_broken(),
|
|
"consecutive_failures": cat_adapter._consecutive_failures
|
|
}
|
|
|
|
|
|
@router.post("/memory/toggle")
|
|
async def toggle_cat_integration(enabled: bool = Form(...)):
|
|
"""Toggle Cheshire Cat integration on/off."""
|
|
globals.USE_CHESHIRE_CAT = enabled
|
|
logger.info(f"🐱 Cheshire Cat integration {'ENABLED' if enabled else 'DISABLED'}")
|
|
|
|
# Persist so it survives restarts
|
|
try:
|
|
from config_manager import config_manager
|
|
config_manager.set("memory.use_cheshire_cat", enabled, persist=True)
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
"success": True,
|
|
"enabled": globals.USE_CHESHIRE_CAT,
|
|
"message": f"Cheshire Cat {'enabled' if enabled else 'disabled'}"
|
|
}
|
|
|
|
|
|
@router.get("/memory/stats")
|
|
async def get_memory_stats():
|
|
"""Get memory collection statistics from Cheshire Cat (point counts per collection)."""
|
|
from utils.cat_client import cat_adapter
|
|
stats = await cat_adapter.get_memory_stats()
|
|
if stats is None:
|
|
return JSONResponse(status_code=502, content={"success": False, "error": "Could not reach Cheshire Cat"})
|
|
return {"success": True, "collections": stats.get("collections", [])}
|
|
|
|
|
|
@router.get("/memory/facts")
|
|
async def get_memory_facts():
|
|
"""Get all declarative memory facts (learned knowledge about users)."""
|
|
from utils.cat_client import cat_adapter
|
|
facts = await cat_adapter.get_all_facts()
|
|
return {"success": True, "facts": facts, "count": len(facts)}
|
|
|
|
|
|
@router.get("/memory/episodic")
|
|
async def get_episodic_memories():
|
|
"""Get all episodic memories (conversation snippets)."""
|
|
from utils.cat_client import cat_adapter
|
|
result = await cat_adapter.get_memory_points(collection="episodic", limit=100)
|
|
if result is None:
|
|
return JSONResponse(status_code=502, content={"success": False, "error": "Could not reach Cheshire Cat"})
|
|
|
|
memories = []
|
|
for point in result.get("points", []):
|
|
payload = point.get("payload", {})
|
|
memories.append({
|
|
"id": point.get("id"),
|
|
"content": payload.get("page_content", ""),
|
|
"metadata": payload.get("metadata", {}),
|
|
})
|
|
|
|
return {"success": True, "memories": memories, "count": len(memories)}
|
|
|
|
|
|
@router.post("/memory/consolidate")
|
|
async def trigger_memory_consolidation():
|
|
"""Manually trigger memory consolidation (sleep consolidation process)."""
|
|
from utils.cat_client import cat_adapter
|
|
logger.info("🌙 Manual memory consolidation triggered via API")
|
|
result = await cat_adapter.trigger_consolidation()
|
|
if result is None:
|
|
return JSONResponse(status_code=500, content={"success": False, "error": "Consolidation failed or timed out"})
|
|
return {"success": True, "result": result}
|
|
|
|
|
|
@router.post("/memory/delete")
|
|
async def delete_all_memories(request: MemoryDeleteRequest):
|
|
"""
|
|
Delete ALL of Miku's memories. Requires exact confirmation string.
|
|
|
|
The confirmation field must be exactly:
|
|
"Yes, I am deleting Miku's memories fully."
|
|
|
|
This is destructive and irreversible.
|
|
"""
|
|
REQUIRED_CONFIRMATION = "Yes, I am deleting Miku's memories fully."
|
|
|
|
if request.confirmation != REQUIRED_CONFIRMATION:
|
|
logger.warning(f"Memory deletion rejected: wrong confirmation string")
|
|
return JSONResponse(status_code=400, content={
|
|
"success": False,
|
|
"error": "Confirmation string does not match. "
|
|
f"Expected exactly: \"{REQUIRED_CONFIRMATION}\""
|
|
})
|
|
|
|
from utils.cat_client import cat_adapter
|
|
logger.warning("⚠️ MEMORY DELETION CONFIRMED — wiping all memories!")
|
|
|
|
# Wipe vector memories (episodic + declarative)
|
|
wipe_success = await cat_adapter.wipe_all_memories()
|
|
|
|
# Also clear conversation history
|
|
history_success = await cat_adapter.wipe_conversation_history()
|
|
|
|
if wipe_success:
|
|
logger.warning("🗑️ All Miku memories have been deleted.")
|
|
return {
|
|
"success": True,
|
|
"message": "All memories have been permanently deleted.",
|
|
"vector_memory_wiped": wipe_success,
|
|
"conversation_history_cleared": history_success
|
|
}
|
|
else:
|
|
return JSONResponse(status_code=500, content={
|
|
"success": False,
|
|
"error": "Failed to wipe memory collections. Check Cat connection."
|
|
})
|
|
|
|
|
|
@router.delete("/memory/point/{collection}/{point_id}")
|
|
async def delete_single_memory_point(collection: str, point_id: str):
|
|
"""Delete a single memory point by collection and ID."""
|
|
from utils.cat_client import cat_adapter
|
|
success = await cat_adapter.delete_memory_point(collection, point_id)
|
|
if success:
|
|
return {"success": True, "deleted": point_id}
|
|
else:
|
|
return JSONResponse(status_code=500, content={"success": False, "error": f"Failed to delete point {point_id}"})
|
|
|
|
|
|
@router.put("/memory/point/{collection}/{point_id}")
|
|
async def edit_memory_point(collection: str, point_id: str, request: MemoryEditRequest):
|
|
"""Edit an existing memory point's content and/or metadata."""
|
|
from utils.cat_client import cat_adapter
|
|
success = await cat_adapter.update_memory_point(
|
|
collection=collection,
|
|
point_id=point_id,
|
|
content=request.content,
|
|
metadata=request.metadata
|
|
)
|
|
if success:
|
|
return {"success": True, "updated": point_id}
|
|
else:
|
|
return JSONResponse(status_code=500, content={"success": False, "error": f"Failed to update point {point_id}"})
|
|
|
|
|
|
@router.post("/memory/create")
|
|
async def create_memory_point(request: MemoryCreateRequest):
|
|
"""
|
|
Manually create a new memory (declarative fact or episodic memory).
|
|
|
|
For declarative facts, this allows you to teach Miku new knowledge.
|
|
For episodic memories, this allows you to inject conversation context.
|
|
"""
|
|
from utils.cat_client import cat_adapter
|
|
|
|
if request.collection not in ['declarative', 'episodic']:
|
|
return JSONResponse(status_code=400, content={"success": False, "error": "Collection must be 'declarative' or 'episodic'"})
|
|
|
|
# Create the memory point
|
|
result = await cat_adapter.create_memory_point(
|
|
collection=request.collection,
|
|
content=request.content,
|
|
user_id=request.user_id or "manual_admin",
|
|
source=request.source or "manual_web_ui",
|
|
metadata=request.metadata or {}
|
|
)
|
|
|
|
if result:
|
|
return {"success": True, "point_id": result, "collection": request.collection}
|
|
else:
|
|
return JSONResponse(status_code=500, content={"success": False, "error": "Failed to create memory point"})
|