Files
miku-discord/bot/routes/memory.py

255 lines
9.5 KiB
Python
Raw Normal View History

"""Cheshire Cat memory management routes."""
import asyncio
import time
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Form
from fastapi.responses import JSONResponse
import globals
from routes.models import MemoryDeleteRequest, MemoryEditRequest, MemoryCreateRequest
from utils.logger import get_logger
logger = get_logger('api')
router = APIRouter()
@router.get("/memory/status")
async def get_cat_memory_status():
"""Get Cheshire Cat connection status, feature flag, and consolidation state."""
from utils.cat_client import cat_adapter
from utils.consolidation_scheduler import get_consolidation_status
is_healthy = await cat_adapter.health_check()
return {
"enabled": globals.USE_CHESHIRE_CAT,
"healthy": is_healthy,
"url": globals.CHESHIRE_CAT_URL,
"circuit_breaker_active": cat_adapter._is_circuit_broken(),
"consecutive_failures": cat_adapter._consecutive_failures,
"consolidation": get_consolidation_status(),
}
@router.post("/memory/toggle")
async def toggle_cat_integration(enabled: bool = Form(...)):
"""Toggle Cheshire Cat integration on/off."""
globals.USE_CHESHIRE_CAT = enabled
logger.info(f"🐱 Cheshire Cat integration {'ENABLED' if enabled else 'DISABLED'}")
# Persist so it survives restarts
try:
from config_manager import config_manager
config_manager.set("memory.use_cheshire_cat", enabled, persist=True)
except Exception:
pass
return {
"success": True,
"enabled": globals.USE_CHESHIRE_CAT,
"message": f"Cheshire Cat {'enabled' if enabled else 'disabled'}"
}
@router.get("/memory/stats")
async def get_memory_stats():
"""Get memory collection statistics from Cheshire Cat (point counts per collection)."""
from utils.cat_client import cat_adapter
stats = await cat_adapter.get_memory_stats()
if stats is None:
return JSONResponse(status_code=502, content={"success": False, "error": "Could not reach Cheshire Cat"})
return {"success": True, "collections": stats.get("collections", [])}
@router.get("/memory/facts")
async def get_memory_facts():
"""Get all declarative memory facts (learned knowledge about users)."""
from utils.cat_client import cat_adapter
facts = await cat_adapter.get_all_facts()
return {"success": True, "facts": facts, "count": len(facts)}
@router.get("/memory/episodic")
async def get_episodic_memories():
"""Get all episodic memories (conversation snippets)."""
from utils.cat_client import cat_adapter
result = await cat_adapter.get_memory_points(collection="episodic", limit=100)
if result is None:
return JSONResponse(status_code=502, content={"success": False, "error": "Could not reach Cheshire Cat"})
memories = []
for point in result.get("points", []):
payload = point.get("payload", {})
memories.append({
"id": point.get("id"),
"content": payload.get("page_content", ""),
"metadata": payload.get("metadata", {}),
})
return {"success": True, "memories": memories, "count": len(memories)}
@router.post("/memory/consolidate")
async def trigger_memory_consolidation():
"""
Trigger memory consolidation as a background task.
Returns immediately the Web UI should poll /memory/status
to see when consolidation completes and view the result.
"""
from utils.cat_client import cat_adapter
from utils.consolidation_scheduler import get_consolidation_status
# Check if already running
status = get_consolidation_status()
if status.get('is_running'):
return {"success": True, "message": "Consolidation is already running", "status": status}
logger.info("🌙 Manual memory consolidation triggered via API (background)...")
# Launch consolidation as a background task so the API returns immediately.
# The result is tracked via consolidation_scheduler's _last_consolidation state.
asyncio.create_task(_run_consolidation_background())
return {"success": True, "message": "Consolidation started in background. Check status via /memory/status"}
async def _run_consolidation_background():
"""
Run consolidation as a background task, updating the scheduler state.
This prevents the API from blocking for minutes.
"""
from utils.cat_client import cat_adapter
from utils.consolidation_scheduler import _last_consolidation
_last_consolidation['is_running'] = True
_last_consolidation['last_run'] = datetime.now().isoformat()
_last_consolidation['total_runs'] += 1
start_time = time.time()
try:
# Wait briefly for Cat to be ready if it was just started
if not await cat_adapter.health_check():
_last_consolidation['last_error'] = 'Cat health check failed'
_last_consolidation['is_running'] = False
return
result = await cat_adapter.trigger_consolidation(timeout=600)
elapsed = time.time() - start_time
if result:
logger.info(f"🌙 Manual consolidation completed in {elapsed:.1f}s: {result[:200]}")
_last_consolidation['last_result'] = result
_last_consolidation['last_error'] = None
_last_consolidation['successful_runs'] += 1
else:
logger.error(f"🌙 Manual consolidation returned no result after {elapsed:.1f}s")
_last_consolidation['last_error'] = f'No result returned after {elapsed:.1f}s (timeout or connection error)'
except Exception as e:
elapsed = time.time() - start_time
logger.error(f"🌙 Manual consolidation failed after {elapsed:.1f}s: {e}")
_last_consolidation['last_error'] = str(e)
finally:
_last_consolidation['is_running'] = False
@router.post("/memory/delete")
async def delete_all_memories(request: MemoryDeleteRequest):
"""
Delete ALL of Miku's memories. Requires exact confirmation string.
The confirmation field must be exactly:
"Yes, I am deleting Miku's memories fully."
This is destructive and irreversible.
"""
REQUIRED_CONFIRMATION = "Yes, I am deleting Miku's memories fully."
if request.confirmation != REQUIRED_CONFIRMATION:
logger.warning(f"Memory deletion rejected: wrong confirmation string")
return JSONResponse(status_code=400, content={
"success": False,
"error": "Confirmation string does not match. "
f"Expected exactly: \"{REQUIRED_CONFIRMATION}\""
})
from utils.cat_client import cat_adapter
logger.warning("⚠️ MEMORY DELETION CONFIRMED — wiping all memories!")
# Wipe vector memories (episodic + declarative)
wipe_success = await cat_adapter.wipe_all_memories()
# Also clear conversation history
history_success = await cat_adapter.wipe_conversation_history()
if wipe_success:
logger.warning("🗑️ All Miku memories have been deleted.")
return {
"success": True,
"message": "All memories have been permanently deleted.",
"vector_memory_wiped": wipe_success,
"conversation_history_cleared": history_success
}
else:
return JSONResponse(status_code=500, content={
"success": False,
"error": "Failed to wipe memory collections. Check Cat connection."
})
@router.delete("/memory/point/{collection}/{point_id}")
async def delete_single_memory_point(collection: str, point_id: str):
"""Delete a single memory point by collection and ID."""
from utils.cat_client import cat_adapter
success = await cat_adapter.delete_memory_point(collection, point_id)
if success:
return {"success": True, "deleted": point_id}
else:
return JSONResponse(status_code=500, content={"success": False, "error": f"Failed to delete point {point_id}"})
@router.put("/memory/point/{collection}/{point_id}")
async def edit_memory_point(collection: str, point_id: str, request: MemoryEditRequest):
"""Edit an existing memory point's content and/or metadata."""
from utils.cat_client import cat_adapter
success = await cat_adapter.update_memory_point(
collection=collection,
point_id=point_id,
content=request.content,
metadata=request.metadata
)
if success:
return {"success": True, "updated": point_id}
else:
return JSONResponse(status_code=500, content={"success": False, "error": f"Failed to update point {point_id}"})
@router.post("/memory/create")
async def create_memory_point(request: MemoryCreateRequest):
"""
Manually create a new memory (declarative fact or episodic memory).
For declarative facts, this allows you to teach Miku new knowledge.
For episodic memories, this allows you to inject conversation context.
"""
from utils.cat_client import cat_adapter
if request.collection not in ['declarative', 'episodic']:
return JSONResponse(status_code=400, content={"success": False, "error": "Collection must be 'declarative' or 'episodic'"})
# Create the memory point
result = await cat_adapter.create_memory_point(
collection=request.collection,
content=request.content,
user_id=request.user_id or "manual_admin",
source=request.source or "manual_web_ui",
metadata=request.metadata or {}
)
if result:
return {"success": True, "point_id": result, "collection": request.collection}
else:
return JSONResponse(status_code=500, content={"success": False, "error": "Failed to create memory point"})