# utils/consolidation_scheduler.py """ Nightly memory consolidation scheduler. Runs the Cheshire Cat memory consolidation pipeline on a fixed schedule (4:00 AM UTC by default) to mimic human REM sleep consolidation: 1. Query all unconsolidated episodic memories from Qdrant 2. Classify and delete trivial messages 3. Mark kept memories as consolidated 4. Extract declarative facts per user via LLM 5. Store facts in the declarative memory collection """ import asyncio import time from datetime import datetime import globals from utils.logger import get_logger logger = get_logger('consolidation') # Tracks the last consolidation run time and result _last_consolidation = { 'last_run': None, # ISO timestamp of last run 'last_result': None, # Result string from Cat 'last_error': None, # Error message if last run failed 'is_running': False, # Whether consolidation is currently in progress 'total_runs': 0, # Total number of consolidation runs 'successful_runs': 0, # Number of successful runs } def get_consolidation_status() -> dict: """Return the current consolidation status for the Web UI.""" return { 'last_run': _last_consolidation['last_run'], 'last_result': _last_consolidation['last_result'], 'last_error': _last_consolidation['last_error'], 'is_running': _last_consolidation['is_running'], 'total_runs': _last_consolidation['total_runs'], 'successful_runs': _last_consolidation['successful_runs'], } async def run_nightly_consolidation(): """ Run the nightly memory consolidation process. This is the entry point called by APScheduler. It: 1. Checks if Cheshire Cat is enabled and healthy 2. Skips if consolidation is already running 3. Sends a 'consolidate now' message via WebSocket to Cat 4. Logs the result and updates tracking state """ # Prevent overlapping runs if _last_consolidation['is_running']: logger.warning("🌙 Consolidation already running, skipping scheduled run") return if not globals.USE_CHESHIRE_CAT: logger.info("🌙 Skipping scheduled consolidation: Cheshire Cat is disabled") return _last_consolidation['is_running'] = True _last_consolidation['last_run'] = datetime.now().isoformat() start_time = time.time() try: from utils.cat_client import cat_adapter # Check Cat health before attempting if not await cat_adapter.health_check(): logger.warning("🌙 Skipping scheduled consolidation: Cat is not healthy") _last_consolidation['last_error'] = 'Cat health check failed' _last_consolidation['is_running'] = False _last_consolidation['total_runs'] += 1 return logger.info("🌙 Starting nightly memory consolidation (scheduled)...") # Trigger consolidation via WebSocket # This runs synchronously within the hook and can take several minutes result = await cat_adapter.trigger_consolidation() elapsed = time.time() - start_time if result: logger.info(f"🌙 Nightly consolidation completed in {elapsed:.1f}s: {result[:200]}") _last_consolidation['last_result'] = result _last_consolidation['last_error'] = None _last_consolidation['successful_runs'] += 1 else: logger.error(f"🌙 Nightly consolidation returned no result after {elapsed:.1f}s") _last_consolidation['last_error'] = 'No result returned (timeout or connection error)' except Exception as e: elapsed = time.time() - start_time logger.error(f"🌙 Nightly consolidation failed after {elapsed:.1f}s: {e}") _last_consolidation['last_error'] = str(e) finally: _last_consolidation['total_runs'] += 1 _last_consolidation['is_running'] = False