Files
miku-discord/bot/utils/consolidation_scheduler.py

106 lines
3.8 KiB
Python
Raw Permalink Normal View History

# utils/consolidation_scheduler.py
"""
Nightly memory consolidation scheduler.
Runs the Cheshire Cat memory consolidation pipeline on a fixed schedule
(4:00 AM UTC by default) to mimic human REM sleep consolidation:
1. Query all unconsolidated episodic memories from Qdrant
2. Classify and delete trivial messages
3. Mark kept memories as consolidated
4. Extract declarative facts per user via LLM
5. Store facts in the declarative memory collection
"""
import asyncio
import time
from datetime import datetime
import globals
from utils.logger import get_logger
logger = get_logger('consolidation')
# Tracks the last consolidation run time and result
_last_consolidation = {
'last_run': None, # ISO timestamp of last run
'last_result': None, # Result string from Cat
'last_error': None, # Error message if last run failed
'is_running': False, # Whether consolidation is currently in progress
'total_runs': 0, # Total number of consolidation runs
'successful_runs': 0, # Number of successful runs
}
def get_consolidation_status() -> dict:
"""Return the current consolidation status for the Web UI."""
return {
'last_run': _last_consolidation['last_run'],
'last_result': _last_consolidation['last_result'],
'last_error': _last_consolidation['last_error'],
'is_running': _last_consolidation['is_running'],
'total_runs': _last_consolidation['total_runs'],
'successful_runs': _last_consolidation['successful_runs'],
}
async def run_nightly_consolidation():
"""
Run the nightly memory consolidation process.
This is the entry point called by APScheduler. It:
1. Checks if Cheshire Cat is enabled and healthy
2. Skips if consolidation is already running
3. Sends a 'consolidate now' message via WebSocket to Cat
4. Logs the result and updates tracking state
"""
# Prevent overlapping runs
if _last_consolidation['is_running']:
logger.warning("🌙 Consolidation already running, skipping scheduled run")
return
if not globals.USE_CHESHIRE_CAT:
logger.info("🌙 Skipping scheduled consolidation: Cheshire Cat is disabled")
return
_last_consolidation['is_running'] = True
_last_consolidation['last_run'] = datetime.now().isoformat()
start_time = time.time()
try:
from utils.cat_client import cat_adapter
# Check Cat health before attempting
if not await cat_adapter.health_check():
logger.warning("🌙 Skipping scheduled consolidation: Cat is not healthy")
_last_consolidation['last_error'] = 'Cat health check failed'
_last_consolidation['is_running'] = False
_last_consolidation['total_runs'] += 1
return
logger.info("🌙 Starting nightly memory consolidation (scheduled)...")
# Trigger consolidation via WebSocket
# This runs synchronously within the hook and can take several minutes
result = await cat_adapter.trigger_consolidation()
elapsed = time.time() - start_time
if result:
logger.info(f"🌙 Nightly consolidation completed in {elapsed:.1f}s: {result[:200]}")
_last_consolidation['last_result'] = result
_last_consolidation['last_error'] = None
_last_consolidation['successful_runs'] += 1
else:
logger.error(f"🌙 Nightly consolidation returned no result after {elapsed:.1f}s")
_last_consolidation['last_error'] = 'No result returned (timeout or connection error)'
except Exception as e:
elapsed = time.time() - start_time
logger.error(f"🌙 Nightly consolidation failed after {elapsed:.1f}s: {e}")
_last_consolidation['last_error'] = str(e)
finally:
_last_consolidation['total_runs'] += 1
_last_consolidation['is_running'] = False