Files
marcus-web/scripts/fetch_movie_data.py

275 lines
7.8 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Fetch movie data for Hugo posts based on IMDB ID in frontmatter.
Scans all posts with an `imdb` field and fetches missing data:
- Poster (downloaded locally)
- Runtime
- Year
- Director
- Genres
Usage:
python scripts/fetch_movie_data.py # Process all movie posts
python scripts/fetch_movie_data.py --dry-run # Show what would be updated
python scripts/fetch_movie_data.py --force # Re-fetch even if data exists
"""
import argparse
import os
import re
import sys
from pathlib import Path
import requests
import yaml
# Configuration
try:
from config import TMDB_API_KEY
except ImportError:
raise SystemExit("Error: scripts/config.py not found. Copy config.example.py to config.py and add your API key.")
# Paths
SCRIPT_DIR = Path(__file__).parent
PROJECT_ROOT = SCRIPT_DIR.parent
CONTENT_DIR = PROJECT_ROOT / "content" / "posts"
IMAGES_DIR = PROJECT_ROOT / "static" / "images" / "posters"
# Regex to split frontmatter from content
FRONTMATTER_RE = re.compile(r'^---\s*\n(.*?)\n---\s*\n', re.DOTALL)
def find_movie_by_imdb(imdb_id):
"""Find TMDB movie by IMDB ID."""
url = f"https://api.themoviedb.org/3/find/{imdb_id}"
params = {
"api_key": TMDB_API_KEY,
"external_source": "imdb_id"
}
resp = requests.get(url, params=params, timeout=10)
resp.raise_for_status()
data = resp.json()
results = data.get("movie_results", [])
if results:
return results[0]
return None
def get_movie_details(tmdb_id):
"""Get full movie details from TMDB."""
url = f"https://api.themoviedb.org/3/movie/{tmdb_id}"
params = {
"api_key": TMDB_API_KEY,
"append_to_response": "credits"
}
resp = requests.get(url, params=params, timeout=10)
resp.raise_for_status()
return resp.json()
def get_directors(credits):
"""Extract director names from credits."""
crew = credits.get("crew", [])
directors = [p["name"] for p in crew if p.get("job") == "Director"]
return directors
def slugify(title):
"""Convert title to URL-friendly slug."""
slug = title.lower()
slug = re.sub(r"[^a-z0-9\s-]", "", slug)
slug = re.sub(r"[\s_]+", "-", slug)
slug = re.sub(r"-+", "-", slug)
return slug.strip("-")
def download_poster(poster_path, filename):
"""Download poster from TMDB."""
if not poster_path:
return None
url = f"https://image.tmdb.org/t/p/w500{poster_path}"
resp = requests.get(url, timeout=10)
resp.raise_for_status()
IMAGES_DIR.mkdir(parents=True, exist_ok=True)
filepath = IMAGES_DIR / filename
filepath.write_bytes(resp.content)
return f"/images/posters/{filename}"
def parse_post(filepath):
"""Parse a markdown post into frontmatter dict and content string."""
text = filepath.read_text()
match = FRONTMATTER_RE.match(text)
if not match:
return None, text
fm_text = match.group(1)
content = text[match.end():]
try:
frontmatter = yaml.safe_load(fm_text)
except yaml.YAMLError:
return None, text
return frontmatter, content
def write_post(filepath, frontmatter, content):
"""Write frontmatter and content back to markdown file."""
# Use default_flow_style=False for readable YAML
# Use allow_unicode=True for proper character handling
fm_text = yaml.dump(
frontmatter,
default_flow_style=False,
allow_unicode=True,
sort_keys=False
)
text = f"---\n{fm_text}---\n{content}"
filepath.write_text(text)
def process_post(filepath, dry_run=False, force=False):
"""Process a single post, fetching missing movie data."""
frontmatter, content = parse_post(filepath)
if frontmatter is None:
return False
imdb_id = frontmatter.get("imdb")
if not imdb_id:
return False
# Check what's missing
has_poster = bool(frontmatter.get("poster"))
has_runtime = bool(frontmatter.get("runtime"))
has_year = bool(frontmatter.get("year"))
has_director = bool(frontmatter.get("director"))
needs_update = not (has_poster and has_runtime and has_year and has_director)
if not needs_update and not force:
return False
print(f"\nProcessing: {filepath.name}")
print(f" IMDB: {imdb_id}")
if dry_run:
missing = []
if not has_poster:
missing.append("poster")
if not has_runtime:
missing.append("runtime")
if not has_year:
missing.append("year")
if not has_director:
missing.append("director")
print(f" Would fetch: {', '.join(missing)}")
return True
# Find movie on TMDB
print(" Finding movie on TMDB...")
movie = find_movie_by_imdb(imdb_id)
if not movie:
print(f" ERROR: Movie not found for IMDB ID: {imdb_id}")
return False
tmdb_id = movie["id"]
print(f" Found: {movie.get('title')} (TMDB: {tmdb_id})")
# Get full details
print(" Fetching details...")
details = get_movie_details(tmdb_id)
updated = False
# Update poster
if not has_poster or force:
poster_path = details.get("poster_path")
if poster_path:
title = frontmatter.get("title", "movie")
filename = f"{slugify(title)}.jpg"
print(f" Downloading poster...")
poster_url = download_poster(poster_path, filename)
if poster_url:
frontmatter["poster"] = poster_url
print(f" Poster saved: {poster_url}")
updated = True
# Update runtime
if not has_runtime or force:
runtime = details.get("runtime")
if runtime:
frontmatter["runtime"] = runtime
print(f" Runtime: {runtime} minutes")
updated = True
# Update year
if not has_year or force:
release_date = details.get("release_date", "")
if release_date:
year = release_date.split("-")[0]
frontmatter["year"] = int(year)
print(f" Year: {year}")
updated = True
# Update director
if not has_director or force:
credits = details.get("credits", {})
directors = get_directors(credits)
if directors:
# Store as string if single, list if multiple
if len(directors) == 1:
frontmatter["director"] = directors[0]
else:
frontmatter["director"] = directors
print(f" Director: {', '.join(directors)}")
updated = True
# Update genres (bonus)
if "genres" not in frontmatter or force:
genres = [g["name"] for g in details.get("genres", [])]
if genres:
frontmatter["genres"] = genres
updated = True
if updated:
write_post(filepath, frontmatter, content)
print(" Updated!")
return updated
def main():
parser = argparse.ArgumentParser(description="Fetch movie data for Hugo posts")
parser.add_argument("--dry-run", action="store_true", help="Show what would be updated")
parser.add_argument("--force", action="store_true", help="Re-fetch even if data exists")
parser.add_argument("file", nargs="?", help="Specific file to process")
args = parser.parse_args()
if args.file:
filepath = Path(args.file)
if not filepath.is_absolute():
filepath = PROJECT_ROOT / filepath
if not filepath.exists():
print(f"File not found: {filepath}")
sys.exit(1)
files = [filepath]
else:
files = list(CONTENT_DIR.glob("**/*.md"))
print(f"Scanning {len(files)} posts for movie data...")
updated = 0
for filepath in files:
if process_post(filepath, dry_run=args.dry_run, force=args.force):
updated += 1
print(f"\n{'Would update' if args.dry_run else 'Updated'}: {updated} posts")
if __name__ == "__main__":
main()