174 lines
6.0 KiB
Python
174 lines
6.0 KiB
Python
# utils/twitter_fetcher.py
|
|
|
|
import asyncio
|
|
import json
|
|
from typing import Dict, Any
|
|
from twscrape import API, gather, Account
|
|
from playwright.async_api import async_playwright
|
|
from pathlib import Path
|
|
|
|
COOKIE_PATH = Path(__file__).parent / "x.com.cookies.json"
|
|
|
|
async def extract_media_urls(page, tweet_url):
|
|
print(f"🔍 Visiting tweet page: {tweet_url}")
|
|
try:
|
|
await page.goto(tweet_url, timeout=15000)
|
|
await page.wait_for_timeout(1000)
|
|
|
|
media_elements = await page.query_selector_all("img[src*='pbs.twimg.com/media']")
|
|
urls = set()
|
|
|
|
for element in media_elements:
|
|
src = await element.get_attribute("src")
|
|
if src:
|
|
cleaned = src.split("&name=")[0] + "&name=large"
|
|
urls.add(cleaned)
|
|
|
|
print(f"🖼️ Found {len(urls)} media URLs on tweet: {tweet_url}")
|
|
return list(urls)
|
|
|
|
except Exception as e:
|
|
print(f"❌ Playwright error on {tweet_url}: {e}")
|
|
return []
|
|
|
|
async def fetch_miku_tweets(limit=5):
|
|
# Load cookies from JSON file
|
|
with open(COOKIE_PATH, "r", encoding="utf-8") as f:
|
|
cookie_list = json.load(f)
|
|
cookie_header = "; ".join(f"{c['name']}={c['value']}" for c in cookie_list)
|
|
|
|
# Add the account to twscrape
|
|
api = API()
|
|
await api.pool.add_account(
|
|
username="HSankyuu39",
|
|
password="x", # placeholder (won't be used)
|
|
email="x", # optional
|
|
email_password="x", # optional
|
|
cookies=cookie_header
|
|
)
|
|
await api.pool.login_all()
|
|
|
|
print(f"🔎 Searching for Miku tweets (limit={limit})...")
|
|
query = 'Hatsune Miku OR 初音ミク has:images after:2025'
|
|
tweets = await gather(api.search(query, limit=limit, kv={"product": "Top"}))
|
|
|
|
print(f"📄 Found {len(tweets)} tweets, launching browser...")
|
|
|
|
async with async_playwright() as p:
|
|
browser = await p.firefox.launch(headless=True)
|
|
context = await browser.new_context()
|
|
|
|
await context.route("**/*", lambda route, request: (
|
|
route.abort() if any([
|
|
request.resource_type in ["font", "stylesheet"],
|
|
"analytics" in request.url,
|
|
"googletagmanager" in request.url,
|
|
"ads-twitter" in request.url,
|
|
]) else route.continue_()
|
|
))
|
|
|
|
page = await context.new_page()
|
|
|
|
results = []
|
|
for i, tweet in enumerate(tweets, 1):
|
|
username = tweet.user.username
|
|
tweet_url = f"https://twitter.com/{username}/status/{tweet.id}"
|
|
print(f"🧵 Processing tweet {i}/{len(tweets)} from @{username}")
|
|
media_urls = await extract_media_urls(page, tweet_url)
|
|
|
|
if media_urls:
|
|
results.append({
|
|
"username": username,
|
|
"text": tweet.rawContent,
|
|
"url": tweet_url,
|
|
"media": media_urls
|
|
})
|
|
|
|
await browser.close()
|
|
print(f"✅ Finished! Returning {len(results)} tweet(s) with media.")
|
|
return results
|
|
|
|
|
|
async def _search_latest(api: API, query: str, limit: int) -> list:
|
|
# kv product "Latest" to search by latest
|
|
try:
|
|
return await gather(api.search(query, limit=limit, kv={"product": "Latest"}))
|
|
except Exception as e:
|
|
print(f"⚠️ Latest search failed for '{query}': {e}")
|
|
return []
|
|
|
|
|
|
async def fetch_figurine_tweets_latest(limit_per_source: int = 10) -> list:
|
|
"""Search three sources by Latest, collect tweets with images, and return unified list of dicts.
|
|
Sources:
|
|
- "miku figure from:mecchaJP"
|
|
- "miku from:GoodSmile_US"
|
|
- "miku from:OtakuOwletMerch"
|
|
"""
|
|
# Load cookies
|
|
with open(COOKIE_PATH, "r", encoding="utf-8") as f:
|
|
cookie_list = json.load(f)
|
|
cookie_header = "; ".join(f"{c['name']}={c['value']}" for c in cookie_list)
|
|
|
|
api = API()
|
|
await api.pool.add_account(
|
|
username="HSankyuu39",
|
|
password="x",
|
|
email="x",
|
|
email_password="x",
|
|
cookies=cookie_header
|
|
)
|
|
await api.pool.login_all()
|
|
|
|
queries = [
|
|
"miku figure from:mecchaJP",
|
|
"miku from:GoodSmile_US",
|
|
"miku from:OtakuOwletMerch",
|
|
]
|
|
|
|
print("🔎 Searching figurine tweets by Latest across sources...")
|
|
all_tweets = []
|
|
for q in queries:
|
|
tweets = await _search_latest(api, q, limit_per_source)
|
|
all_tweets.extend(tweets)
|
|
|
|
print(f"📄 Found {len(all_tweets)} candidate tweets, launching browser to extract media...")
|
|
|
|
async with async_playwright() as p:
|
|
browser = await p.firefox.launch(headless=True)
|
|
context = await browser.new_context()
|
|
await context.route("**/*", lambda route, request: (
|
|
route.abort() if any([
|
|
request.resource_type in ["font", "stylesheet"],
|
|
"analytics" in request.url,
|
|
"googletagmanager" in request.url,
|
|
"ads-twitter" in request.url,
|
|
]) else route.continue_()
|
|
))
|
|
|
|
page = await context.new_page()
|
|
results = []
|
|
for i, tweet in enumerate(all_tweets, 1):
|
|
try:
|
|
username = tweet.user.username
|
|
tweet_url = f"https://twitter.com/{username}/status/{tweet.id}"
|
|
print(f"🧵 Processing tweet {i}/{len(all_tweets)} from @{username}")
|
|
media_urls = await extract_media_urls(page, tweet_url)
|
|
if media_urls:
|
|
results.append({
|
|
"username": username,
|
|
"text": tweet.rawContent,
|
|
"url": tweet_url,
|
|
"media": media_urls
|
|
})
|
|
except Exception as e:
|
|
print(f"⚠️ Error processing tweet: {e}")
|
|
|
|
await browser.close()
|
|
print(f"✅ Figurine fetch finished. Returning {len(results)} tweet(s) with media.")
|
|
return results
|
|
|
|
|
|
# Note: fetch_tweet_by_url was removed - now using twscrape-based approach in figurine_notifier.py
|
|
# This avoids Playwright browser dependencies while maintaining functionality
|