Step 2 of memory system overhaul: automated scheduling. - New consolidation_scheduler.py: run_nightly_consolidation() function that checks Cat health, triggers consolidation via WebSocket, and tracks run history with success/failure stats - bot.py on_ready: register APScheduler cron job (hour=4, minute=0) alongside the existing daily DM analysis job - routes/memory.py: expose consolidation status (last_run, last_result, last_error, is_running, total_runs, successful_runs) in the /memory/status API response - Web UI: show consolidation schedule info (last run time, success/fail, run counts) below the manual consolidate button, with 'running now' indicator when active The 'sleep consolidation' metaphor is now actually automated instead of being manual-only.
106 lines
3.8 KiB
Python
106 lines
3.8 KiB
Python
# utils/consolidation_scheduler.py
|
|
"""
|
|
Nightly memory consolidation scheduler.
|
|
|
|
Runs the Cheshire Cat memory consolidation pipeline on a fixed schedule
|
|
(4:00 AM UTC by default) to mimic human REM sleep consolidation:
|
|
|
|
1. Query all unconsolidated episodic memories from Qdrant
|
|
2. Classify and delete trivial messages
|
|
3. Mark kept memories as consolidated
|
|
4. Extract declarative facts per user via LLM
|
|
5. Store facts in the declarative memory collection
|
|
"""
|
|
|
|
import asyncio
|
|
import time
|
|
from datetime import datetime
|
|
|
|
import globals
|
|
from utils.logger import get_logger
|
|
|
|
logger = get_logger('consolidation')
|
|
|
|
# Tracks the last consolidation run time and result
|
|
_last_consolidation = {
|
|
'last_run': None, # ISO timestamp of last run
|
|
'last_result': None, # Result string from Cat
|
|
'last_error': None, # Error message if last run failed
|
|
'is_running': False, # Whether consolidation is currently in progress
|
|
'total_runs': 0, # Total number of consolidation runs
|
|
'successful_runs': 0, # Number of successful runs
|
|
}
|
|
|
|
|
|
def get_consolidation_status() -> dict:
|
|
"""Return the current consolidation status for the Web UI."""
|
|
return {
|
|
'last_run': _last_consolidation['last_run'],
|
|
'last_result': _last_consolidation['last_result'],
|
|
'last_error': _last_consolidation['last_error'],
|
|
'is_running': _last_consolidation['is_running'],
|
|
'total_runs': _last_consolidation['total_runs'],
|
|
'successful_runs': _last_consolidation['successful_runs'],
|
|
}
|
|
|
|
|
|
async def run_nightly_consolidation():
|
|
"""
|
|
Run the nightly memory consolidation process.
|
|
|
|
This is the entry point called by APScheduler. It:
|
|
1. Checks if Cheshire Cat is enabled and healthy
|
|
2. Skips if consolidation is already running
|
|
3. Sends a 'consolidate now' message via WebSocket to Cat
|
|
4. Logs the result and updates tracking state
|
|
"""
|
|
# Prevent overlapping runs
|
|
if _last_consolidation['is_running']:
|
|
logger.warning("🌙 Consolidation already running, skipping scheduled run")
|
|
return
|
|
|
|
if not globals.USE_CHESHIRE_CAT:
|
|
logger.info("🌙 Skipping scheduled consolidation: Cheshire Cat is disabled")
|
|
return
|
|
|
|
_last_consolidation['is_running'] = True
|
|
_last_consolidation['last_run'] = datetime.now().isoformat()
|
|
start_time = time.time()
|
|
|
|
try:
|
|
from utils.cat_client import cat_adapter
|
|
|
|
# Check Cat health before attempting
|
|
if not await cat_adapter.health_check():
|
|
logger.warning("🌙 Skipping scheduled consolidation: Cat is not healthy")
|
|
_last_consolidation['last_error'] = 'Cat health check failed'
|
|
_last_consolidation['is_running'] = False
|
|
_last_consolidation['total_runs'] += 1
|
|
return
|
|
|
|
logger.info("🌙 Starting nightly memory consolidation (scheduled)...")
|
|
|
|
# Trigger consolidation via WebSocket
|
|
# This runs synchronously within the hook and can take several minutes
|
|
result = await cat_adapter.trigger_consolidation()
|
|
|
|
elapsed = time.time() - start_time
|
|
|
|
if result:
|
|
logger.info(f"🌙 Nightly consolidation completed in {elapsed:.1f}s: {result[:200]}")
|
|
_last_consolidation['last_result'] = result
|
|
_last_consolidation['last_error'] = None
|
|
_last_consolidation['successful_runs'] += 1
|
|
else:
|
|
logger.error(f"🌙 Nightly consolidation returned no result after {elapsed:.1f}s")
|
|
_last_consolidation['last_error'] = 'No result returned (timeout or connection error)'
|
|
|
|
except Exception as e:
|
|
elapsed = time.time() - start_time
|
|
logger.error(f"🌙 Nightly consolidation failed after {elapsed:.1f}s: {e}")
|
|
_last_consolidation['last_error'] = str(e)
|
|
|
|
finally:
|
|
_last_consolidation['total_runs'] += 1
|
|
_last_consolidation['is_running'] = False
|