Files
miku-discord/bot/.bak.bot.py.250625

541 lines
19 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import discord
import aiohttp
import asyncio
import os
import re
import random
import string
import base64
import subprocess
import aiofiles
from langchain_community.vectorstores import FAISS
from langchain_ollama import OllamaEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from langchain_community.docstore.document import Document
from collections import defaultdict, deque
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from discord import File
from discord import Status
import datetime
from apscheduler.triggers.date import DateTrigger
from datetime import datetime, timedelta
scheduler = AsyncIOScheduler()
BEDTIME_CHANNEL_IDS = [761014220707332107]
# Stores last 5 exchanges per user (as deque)
conversation_history = defaultdict(lambda: deque(maxlen=5))
DISCORD_BOT_TOKEN = os.getenv("DISCORD_BOT_TOKEN")
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://ollama:11434")
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "mistral")
embeddings = OllamaEmbeddings(
model=OLLAMA_MODEL,
base_url=OLLAMA_URL
)
# Set up Discord client
intents = discord.Intents.default()
intents.message_content = True
intents.members = True
intents.presences = True
client = discord.Client(intents=intents)
current_model = None # Track currently loaded model name
KINDNESS_KEYWORDS = [
"thank you", "love you", "luv u", "you're the best", "so cute",
"adorable", "amazing", "sweet", "kind", "great job", "well done",
"precious", "good girl", "cutie", "angel", "my favorite", "so helpful"
]
HEART_REACTIONS = ["💙", "💝", "💖", "💕", "💜", "❤️‍🔥", "☺️"]
kindness_reacted_messages = set()
# Switch model
async def switch_model(model_name: str, timeout: int = 600):
global current_model
if current_model == model_name:
print(f"🔁 Model '{model_name}' already loaded.")
return
# Unload all other models to clear VRAM
async with aiohttp.ClientSession() as session:
async with session.get(f"{OLLAMA_URL}/api/show") as resp:
if resp.status == 200:
data = await resp.json()
loaded_models = data.get("models", [])
for model in loaded_models:
if model["name"] != model_name:
print(f"🔁 Unloading model: {model['name']}")
await session.post(f"{OLLAMA_URL}/api/stop", json={"name": model["name"]})
else:
print("⚠️ Failed to check currently loaded models.")
print(f"🔄 Switching to model '{model_name}'...")
async with aiohttp.ClientSession() as session:
await session.post(f"{OLLAMA_URL}/api/stop")
# Warm up the new model (dummy call to preload it)
payload = {
"model": model_name,
"prompt": "Hello",
"stream": False
}
headers = {"Content-Type": "application/json"}
# Poll until /api/generate returns 200
async with aiohttp.ClientSession() as session:
for _ in range(timeout):
async with session.post(f"{OLLAMA_URL}/api/generate", json=payload, headers=headers) as resp:
if resp.status == 200:
current_model = model_name
print(f"✅ Model {model_name} ready!")
return
await asyncio.sleep(1) # Wait a second before trying again
raise TimeoutError(f"Timed out waiting for model '{model_name}' to become available.")
async def is_miku_addressed(message) -> bool:
# If message is a reply, check the referenced message author
if message.reference:
try:
referenced_msg = await message.channel.fetch_message(message.reference.message_id)
if referenced_msg.author == message.guild.me: # or client.user if you use client
return True
except Exception as e:
print(f"⚠️ Could not fetch referenced message: {e}")
cleaned = message.content.strip()
return bool(re.search(
r'(?<![\w\(])(?:[^\w\s]{0,2}\s*)?miku(?:\s*[^\w\s]{0,2})?(?=,|\s*,|[!\.?\s]*$)',
cleaned,
re.IGNORECASE
))
async def download_and_encode_image(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
if resp.status != 200:
return None
img_bytes = await resp.read()
return base64.b64encode(img_bytes).decode('utf-8')
async def analyze_image_with_qwen(base64_img):
await switch_model("moondream")
payload = {
"model": "moondream",
"prompt": "Describe this image in detail.",
"images": [base64_img],
"stream": False
}
headers = {"Content-Type": "application/json"}
async with aiohttp.ClientSession() as session:
async with session.post(f"{OLLAMA_URL}/api/generate", json=payload, headers=headers) as response:
if response.status == 200:
data = await response.json()
return data.get("response", "No description.")
else:
return f"Error: {response.status}"
async def rephrase_as_miku(qwen_output, user_prompt):
await switch_model(OLLAMA_MODEL) # likely llama3
with open("miku_prompt.txt", "r", encoding="utf-8") as f:
system_prompt = f.read()
relevant_docs_lore = miku_vectorstore.similarity_search(qwen_output, k=3)
context = "\n\n".join([doc.page_content for doc in relevant_docs_lore])
full_prompt = (
f"{context}\n\n"
f"The user asked: \"{user_prompt}\"\n"
f"The image contains: \"{qwen_output}\"\n\n"
f"Respond like Miku: cheerful, helpful, and opinionated when asked.\n\n"
f"Miku:"
)
payload = {
"model": OLLAMA_MODEL,
"prompt": full_prompt,
"system": system_prompt,
"stream": False
}
headers = {"Content-Type": "application/json"}
async with aiohttp.ClientSession() as session:
async with session.post(f"{OLLAMA_URL}/api/generate", json=payload, headers=headers) as response:
if response.status == 200:
data = await response.json()
return data.get("response", "No response.")
else:
return f"Error: {response.status}"
# Load and index once at startup
def load_miku_knowledge():
with open("miku_lore.txt", "r", encoding="utf-8") as f:
text = f.read()
from langchain.text_splitter import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=520,
chunk_overlap=50,
separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""]
)
docs = [Document(page_content=chunk) for chunk in text_splitter.split_text(text)]
vectorstore = FAISS.from_documents(docs, embeddings)
return vectorstore
def load_miku_lyrics():
with open("miku_lyrics.txt", "r", encoding="utf-8") as f:
lyrics_text = f.read()
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=50)
docs = [Document(page_content=chunk) for chunk in text_splitter.split_text(lyrics_text)]
vectorstore = FAISS.from_documents(docs, embeddings)
return vectorstore
miku_vectorstore = load_miku_knowledge()
miku_lyrics_vectorstore = load_miku_lyrics()
async def query_ollama(user_prompt, user_id):
relevant_docs_lore = miku_vectorstore.similarity_search(user_prompt, k=3)
relevant_docs_lyrics = miku_lyrics_vectorstore.similarity_search(user_prompt, k=3)
context_lore = "\n".join([doc.page_content for doc in relevant_docs_lore])
context_lyrics = "\n".join([doc.page_content for doc in relevant_docs_lyrics])
combined_docs = relevant_docs_lore + relevant_docs_lyrics
context = "\n\n".join([doc.page_content for doc in combined_docs])
# Persona definition
with open("miku_prompt.txt", "r", encoding="utf-8") as f:
system_prompt = f.read()
# Build conversation history
history = conversation_history[user_id]
history_text = "\n".join([f"User: {u}\nMiku: {m}" for u, m in history])
# Combine prompt
full_prompt = (
f"{context_lore}\n\n{context_lyrics}\n\n"
f"{history_text}\nUser: {user_prompt}\nMiku:"
)
headers = {'Content-Type': 'application/json'}
payload = {
"model": OLLAMA_MODEL,
"prompt": full_prompt,
"system": system_prompt,
"stream": False
}
async with aiohttp.ClientSession() as session:
async with session.post(f"{OLLAMA_URL}/api/generate", json=payload, headers=headers) as response:
if response.status == 200:
data = await response.json()
reply = data.get("response", "No response.")
# Save to conversation history
conversation_history[user_id].append((user_prompt, reply))
return reply
else:
return f"Error: {response.status}"
async def send_monday_video():
await switch_model(OLLAMA_MODEL)
# Generate a motivational message
prompt = "It's Miku Monday! Give me an energetic and heartfelt Miku Monday morning message to inspire someone for the week ahead."
response = await query_ollama(prompt, user_id="weekly-motivation")
video_url = "http://zip.koko210cloud.xyz/u/zEgU7Z.mp4"
target_channel_ids = [
761014220707332107,
1140377617237807266
]
for channel_id in target_channel_ids:
channel = client.get_channel(channel_id)
if channel is None:
print(f"❌ Could not find channel with ID {channel_id}. Make sure the bot is in the server.")
return
try:
await channel.send(content=response)
# Send video link
await channel.send(f"[Happy Miku Monday!]({video_url})")
print(f"✅ Sent Monday video to channel ID {channel_id}")
except Exception as e:
print(f"⚠️ Failed to send video to channel ID {channel_id}: {e}")
async def send_bedtime_reminder():
await switch_model(OLLAMA_MODEL)
for channel_id in BEDTIME_CHANNEL_IDS:
channel = client.get_channel(channel_id)
if not channel:
print(f"⚠️ Channel ID {channel_id} not found.")
continue
guild = channel.guild
# Filter online members (excluding bots)
online_members = [
member for member in guild.members
if member.status in {Status.online, Status.idle, Status.dnd}
and not member.bot
]
specific_user_id = 214857593045254151 # target user ID
specific_user = guild.get_member(specific_user_id)
if specific_user and specific_user not in online_members:
online_members.append(specific_user)
if not online_members:
print(f"😴 No online members to ping in {guild.name}")
continue
chosen_one = random.choice(online_members)
# Generate bedtime message
prompt = (
f"Write a sweet, funny, or encouraging bedtime message to remind someone it's getting late and they should sleep. "
f"Make it short and wholesome, as if Miku is genuinely worried about their well-being. Imply that it's not good staying up so late."
)
bedtime_message = await query_ollama(prompt, user_id="bedtime-miku")
try:
await channel.send(f"{chosen_one.mention}, {bedtime_message}")
print(f"🌙 Sent bedtime reminder to {chosen_one.display_name} in {guild.name}")
except Exception as e:
print(f"⚠️ Failed to send bedtime reminder in {guild.name}: {e}")
def schedule_random_bedtime():
now = datetime.now()
target_time = now.replace(hour=20, minute=30, second=0, microsecond=0)
# If it's already past 23:30 today, schedule for tomorrow
if now > target_time:
target_time += timedelta(days=1)
# Add random offset (029 mins)
offset_minutes = random.randint(0, 29)
run_time = target_time + timedelta(minutes=offset_minutes)
scheduler.add_job(send_bedtime_reminder, trigger=DateTrigger(run_date=run_time))
print(f"⏰ Bedtime reminder scheduled for {run_time.strftime('%Y-%m-%d %H:%M:%S')}")
async def overlay_username_with_ffmpeg(base_video_path, output_path, username):
font_path = "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf"
text = f"@{username}"
# Define your six positions (x, y)
positions = {
1: ("250", "370"),
2: ("330", "130"),
3: ("300", "90"),
4: ("380", "180"),
5: ("365", "215"),
6: ("55", "365"),
7: ("290", "130"),
8: ("320", "210"),
9: ("310", "240"),
10: ("400", "240")
}
# Each entry: (start_time, end_time, position_index)
text_entries = [
(4.767, 5.367, 1, "username"),
(5.4, 5.967, 2, "username"),
(6.233, 6.833, 3, "username"),
(6.967, 7.6, 4, "username"),
(7.733, 8.367, 5, "username"),
(8.667, 9.133, 6, "username"),
(9.733, 10.667, 7, "username"),
(11.6, 12.033, 8, "@everyone"),
(12.067, 13.0, 9, "@everyone"),
(13.033, 14.135, 10, "@everyone"),
]
# Build drawtext filters
drawtext_filters = []
for start, end, pos_id, text_type in text_entries:
x_coord, y_coord = positions[pos_id]
# Determine actual text content
text_content = f"@{username}" if text_type == "username" else text_type
x = f"{x_coord} - text_w/2"
y = f"{y_coord} - text_h/2"
filter_str = (
f"drawtext=text='{text_content}':"
f"fontfile='{font_path}':"
f"fontcolor=black:fontsize=30:x={x}:y={y}:"
f"enable='between(t,{start},{end})'"
)
drawtext_filters.append(filter_str)
vf_string = ",".join(drawtext_filters)
ffmpeg_command = [
"ffmpeg",
"-i", base_video_path,
"-vf", vf_string,
"-codec:a", "copy",
output_path
]
try:
subprocess.run(ffmpeg_command, check=True)
print("✅ Video processed successfully with username overlays.")
except subprocess.CalledProcessError as e:
print(f"⚠️ FFmpeg error: {e}")
async def detect_and_react_to_kindness(message, after_reply=False):
if message.id in kindness_reacted_messages:
return # Already reacted — skip
content = message.content.lower()
emoji = random.choice(HEART_REACTIONS)
# 1. Keyword-based detection
if any(keyword in content for keyword in KINDNESS_KEYWORDS):
try:
await message.add_reaction(emoji)
kindness_reacted_messages.add(message.id)
message.kindness_reacted = True # Mark as done
print("✅ Kindness detected via keywords. Reacted immediately.")
except Exception as e:
print(f"⚠️ Error adding reaction: {e}")
return
# 2. If not after_reply, defer model-based check
if not after_reply:
print("🗝️ No kindness via keywords. Deferring...")
return
# 3. Model-based detection
try:
prompt = (
"The following message was sent to Miku the bot. "
"Does it sound like the user is being kind or affectionate toward Miku? "
"Answer with 'yes' or 'no' only.\n\n"
f"Message: \"{message.content}\""
)
result = await query_ollama(prompt, user_id="kindness-check")
if result.strip().lower().startswith("yes"):
await message.add_reaction(emoji)
kindness_reacted_messages.add(message.id)
print("✅ Kindness detected via model. Reacted.")
else:
print("🧊 No kindness detected.")
except Exception as e:
print(f"⚠️ Error during kindness analysis: {e}")
@client.event
async def on_ready():
print(f'🎤 MikuBot connected as {client.user}')
# Schedule the weekly task (Monday 07:30)
scheduler.add_job(send_monday_video, 'cron', day_of_week='mon', hour=7, minute=30)
# Schedule first bedtime reminder
schedule_random_bedtime()
# Reschedule every midnight
scheduler.add_job(schedule_random_bedtime, 'cron', hour=0, minute=0)
#scheduler.add_job(send_bedtime_reminder, 'cron', hour=12, minute=22)
scheduler.start()
@client.event
async def on_message(message):
if message.author == client.user:
return
if message.content.strip().lower() == "miku, rape this nigga balls" and message.reference:
async with message.channel.typing():
# Get replied-to user
try:
replied_msg = await message.channel.fetch_message(message.reference.message_id)
target_username = replied_msg.author.display_name
# Prepare video
base_video = "MikuMikuBeam.mp4"
output_video = f"/tmp/video_{''.join(random.choices(string.ascii_letters, k=5))}.mp4"
await overlay_username_with_ffmpeg(base_video, output_video, target_username)
caption = f"Here you go, @{target_username}! 🌟"
#await message.channel.send(content=caption, file=discord.File(output_video))
await replied_msg.reply(file=discord.File(output_video))
except Exception as e:
print(f"⚠️ Error processing video: {e}")
await message.channel.send("Sorry, something went wrong while generating the video.")
return
text = message.content.strip()
if await is_miku_addressed(message):
prompt = text # No cleanup — keep it raw
# 1st kindness check with just keywords
await detect_and_react_to_kindness(message)
async with message.channel.typing():
# If message has an image attachment
if message.attachments:
for attachment in message.attachments:
if any(attachment.filename.lower().endswith(ext) for ext in [".jpg", ".jpeg", ".png", ".webp"]):
base64_img = await download_and_encode_image(attachment.url)
if not base64_img:
await message.channel.send("I couldn't load the image, sorry!")
return
# Analyze image (objective description)
qwen_description = await analyze_image_with_qwen(base64_img)
miku_reply = await rephrase_as_miku(qwen_description, prompt)
await message.channel.send(miku_reply)
return
# If message is just a prompt, no image
response = await query_ollama(prompt, user_id=str(message.author.id))
await message.channel.send(response)
# 2nd kindness check (only if no keywords detected)
await detect_and_react_to_kindness(message, after_reply=True)
if message.content.lower().strip() == "!reset":
conversation_history[str(message.author.id)].clear()
await message.channel.send("Okay! Memory reset for you~ ✨")
# Manual Monday test command
if message.content.lower().strip() == "!monday":
await send_monday_video()
#await message.channel.send("✅ Monday message sent (or attempted). Check logs.")
return
client.run(DISCORD_BOT_TOKEN)