275 lines
7.8 KiB
Python
Executable File
275 lines
7.8 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Fetch movie data for Hugo posts based on IMDB ID in frontmatter.
|
|
|
|
Scans all posts with an `imdb` field and fetches missing data:
|
|
- Poster (downloaded locally)
|
|
- Runtime
|
|
- Year
|
|
- Director
|
|
- Genres
|
|
|
|
Usage:
|
|
python scripts/fetch_movie_data.py # Process all movie posts
|
|
python scripts/fetch_movie_data.py --dry-run # Show what would be updated
|
|
python scripts/fetch_movie_data.py --force # Re-fetch even if data exists
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
import yaml
|
|
|
|
# Configuration
|
|
try:
|
|
from config import TMDB_API_KEY
|
|
except ImportError:
|
|
raise SystemExit("Error: scripts/config.py not found. Copy config.example.py to config.py and add your API key.")
|
|
|
|
# Paths
|
|
SCRIPT_DIR = Path(__file__).parent
|
|
PROJECT_ROOT = SCRIPT_DIR.parent
|
|
CONTENT_DIR = PROJECT_ROOT / "content" / "posts"
|
|
IMAGES_DIR = PROJECT_ROOT / "static" / "images" / "posters"
|
|
|
|
# Regex to split frontmatter from content
|
|
FRONTMATTER_RE = re.compile(r'^---\s*\n(.*?)\n---\s*\n', re.DOTALL)
|
|
|
|
|
|
def find_movie_by_imdb(imdb_id):
|
|
"""Find TMDB movie by IMDB ID."""
|
|
url = f"https://api.themoviedb.org/3/find/{imdb_id}"
|
|
params = {
|
|
"api_key": TMDB_API_KEY,
|
|
"external_source": "imdb_id"
|
|
}
|
|
resp = requests.get(url, params=params, timeout=10)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
|
|
results = data.get("movie_results", [])
|
|
if results:
|
|
return results[0]
|
|
return None
|
|
|
|
|
|
def get_movie_details(tmdb_id):
|
|
"""Get full movie details from TMDB."""
|
|
url = f"https://api.themoviedb.org/3/movie/{tmdb_id}"
|
|
params = {
|
|
"api_key": TMDB_API_KEY,
|
|
"append_to_response": "credits"
|
|
}
|
|
resp = requests.get(url, params=params, timeout=10)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
|
|
def get_directors(credits):
|
|
"""Extract director names from credits."""
|
|
crew = credits.get("crew", [])
|
|
directors = [p["name"] for p in crew if p.get("job") == "Director"]
|
|
return directors
|
|
|
|
|
|
def slugify(title):
|
|
"""Convert title to URL-friendly slug."""
|
|
slug = title.lower()
|
|
slug = re.sub(r"[^a-z0-9\s-]", "", slug)
|
|
slug = re.sub(r"[\s_]+", "-", slug)
|
|
slug = re.sub(r"-+", "-", slug)
|
|
return slug.strip("-")
|
|
|
|
|
|
def download_poster(poster_path, filename):
|
|
"""Download poster from TMDB."""
|
|
if not poster_path:
|
|
return None
|
|
|
|
url = f"https://image.tmdb.org/t/p/w500{poster_path}"
|
|
resp = requests.get(url, timeout=10)
|
|
resp.raise_for_status()
|
|
|
|
IMAGES_DIR.mkdir(parents=True, exist_ok=True)
|
|
filepath = IMAGES_DIR / filename
|
|
filepath.write_bytes(resp.content)
|
|
return f"/images/posters/{filename}"
|
|
|
|
|
|
def parse_post(filepath):
|
|
"""Parse a markdown post into frontmatter dict and content string."""
|
|
text = filepath.read_text()
|
|
match = FRONTMATTER_RE.match(text)
|
|
if not match:
|
|
return None, text
|
|
|
|
fm_text = match.group(1)
|
|
content = text[match.end():]
|
|
|
|
try:
|
|
frontmatter = yaml.safe_load(fm_text)
|
|
except yaml.YAMLError:
|
|
return None, text
|
|
|
|
return frontmatter, content
|
|
|
|
|
|
def write_post(filepath, frontmatter, content):
|
|
"""Write frontmatter and content back to markdown file."""
|
|
# Use default_flow_style=False for readable YAML
|
|
# Use allow_unicode=True for proper character handling
|
|
fm_text = yaml.dump(
|
|
frontmatter,
|
|
default_flow_style=False,
|
|
allow_unicode=True,
|
|
sort_keys=False
|
|
)
|
|
text = f"---\n{fm_text}---\n{content}"
|
|
filepath.write_text(text)
|
|
|
|
|
|
def process_post(filepath, dry_run=False, force=False):
|
|
"""Process a single post, fetching missing movie data."""
|
|
frontmatter, content = parse_post(filepath)
|
|
if frontmatter is None:
|
|
return False
|
|
|
|
imdb_id = frontmatter.get("imdb")
|
|
if not imdb_id:
|
|
return False
|
|
|
|
# Check what's missing
|
|
has_poster = bool(frontmatter.get("poster"))
|
|
has_runtime = bool(frontmatter.get("runtime"))
|
|
has_year = bool(frontmatter.get("year"))
|
|
has_director = bool(frontmatter.get("director"))
|
|
|
|
needs_update = not (has_poster and has_runtime and has_year and has_director)
|
|
|
|
if not needs_update and not force:
|
|
return False
|
|
|
|
print(f"\nProcessing: {filepath.name}")
|
|
print(f" IMDB: {imdb_id}")
|
|
|
|
if dry_run:
|
|
missing = []
|
|
if not has_poster:
|
|
missing.append("poster")
|
|
if not has_runtime:
|
|
missing.append("runtime")
|
|
if not has_year:
|
|
missing.append("year")
|
|
if not has_director:
|
|
missing.append("director")
|
|
print(f" Would fetch: {', '.join(missing)}")
|
|
return True
|
|
|
|
# Find movie on TMDB
|
|
print(" Finding movie on TMDB...")
|
|
movie = find_movie_by_imdb(imdb_id)
|
|
if not movie:
|
|
print(f" ERROR: Movie not found for IMDB ID: {imdb_id}")
|
|
return False
|
|
|
|
tmdb_id = movie["id"]
|
|
print(f" Found: {movie.get('title')} (TMDB: {tmdb_id})")
|
|
|
|
# Get full details
|
|
print(" Fetching details...")
|
|
details = get_movie_details(tmdb_id)
|
|
|
|
updated = False
|
|
|
|
# Update poster
|
|
if not has_poster or force:
|
|
poster_path = details.get("poster_path")
|
|
if poster_path:
|
|
title = frontmatter.get("title", "movie")
|
|
filename = f"{slugify(title)}.jpg"
|
|
print(f" Downloading poster...")
|
|
poster_url = download_poster(poster_path, filename)
|
|
if poster_url:
|
|
frontmatter["poster"] = poster_url
|
|
print(f" Poster saved: {poster_url}")
|
|
updated = True
|
|
|
|
# Update runtime
|
|
if not has_runtime or force:
|
|
runtime = details.get("runtime")
|
|
if runtime:
|
|
frontmatter["runtime"] = runtime
|
|
print(f" Runtime: {runtime} minutes")
|
|
updated = True
|
|
|
|
# Update year
|
|
if not has_year or force:
|
|
release_date = details.get("release_date", "")
|
|
if release_date:
|
|
year = release_date.split("-")[0]
|
|
frontmatter["year"] = int(year)
|
|
print(f" Year: {year}")
|
|
updated = True
|
|
|
|
# Update director
|
|
if not has_director or force:
|
|
credits = details.get("credits", {})
|
|
directors = get_directors(credits)
|
|
if directors:
|
|
# Store as string if single, list if multiple
|
|
if len(directors) == 1:
|
|
frontmatter["director"] = directors[0]
|
|
else:
|
|
frontmatter["director"] = directors
|
|
print(f" Director: {', '.join(directors)}")
|
|
updated = True
|
|
|
|
# Update genres (bonus)
|
|
if "genres" not in frontmatter or force:
|
|
genres = [g["name"] for g in details.get("genres", [])]
|
|
if genres:
|
|
frontmatter["genres"] = genres
|
|
updated = True
|
|
|
|
if updated:
|
|
write_post(filepath, frontmatter, content)
|
|
print(" Updated!")
|
|
|
|
return updated
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Fetch movie data for Hugo posts")
|
|
parser.add_argument("--dry-run", action="store_true", help="Show what would be updated")
|
|
parser.add_argument("--force", action="store_true", help="Re-fetch even if data exists")
|
|
parser.add_argument("file", nargs="?", help="Specific file to process")
|
|
args = parser.parse_args()
|
|
|
|
if args.file:
|
|
filepath = Path(args.file)
|
|
if not filepath.is_absolute():
|
|
filepath = PROJECT_ROOT / filepath
|
|
if not filepath.exists():
|
|
print(f"File not found: {filepath}")
|
|
sys.exit(1)
|
|
files = [filepath]
|
|
else:
|
|
files = list(CONTENT_DIR.glob("**/*.md"))
|
|
|
|
print(f"Scanning {len(files)} posts for movie data...")
|
|
|
|
updated = 0
|
|
for filepath in files:
|
|
if process_post(filepath, dry_run=args.dry_run, force=args.force):
|
|
updated += 1
|
|
|
|
print(f"\n{'Would update' if args.dry_run else 'Updated'}: {updated} posts")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|