Skip to content

Add video_contact_sheet: Generate contact sheets from videos using keyframes #426

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions video_contact_sheet/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# video_contact_sheet

Generate visually rich contact-sheet thumbnails (aka filmstrips) for any number
of videos—handy for quick QA or cataloging.

```bash
# single file
python -m video_contact_sheet.cli demo.mp4 -o out

# entire folder, 8 threads
python -m video_contact_sheet.cli /videos -o out --threads 8 --cols 6
```

## Features
Scene-change detection for “interesting” keyframes
Multithreaded extraction using OpenCV + ffmpeg
Footer shows duration / resolution / codec
Pure-Python, works on Windows/Linux/macOS

10 changes: 10 additions & 0 deletions video_contact_sheet/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""
video_contact_sheet
~~~~~~~~~~~~~~~~~~~
Generate key-frame contact sheets (filmstrip grids) from videos.

>>> python -m video_contact_sheet.cli --help
"""

# __init__.py
__version__ = "0.1.0"
50 changes: 50 additions & 0 deletions video_contact_sheet/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""
Command-line tool: bulk process folders / single files; supports multi-threading.
"""
from __future__ import annotations

import click
from pathlib import Path
from tqdm import tqdm

from .core import video_to_contact_sheet
from .utils import parallel_map

@click.command()
@click.argument("inputs", nargs=-1, type=click.Path(exists=True, path_type=Path))
@click.option("-o", "--out-dir", type=click.Path(path_type=Path), default="sheets")
@click.option("--max-frames", default=15, show_default=True)
@click.option("--cols", default=5, show_default=True, help="Columns in grid")
@click.option("--scene-thresh", default=30.0, show_default=True, help="Scene-change threshold (higher = fewer frames)")
@click.option("--threads", default=4, show_default=True, help="Parallel workers")
def main(
inputs: tuple[Path],
out_dir: Path,
max_frames: int,
cols: int,
scene_thresh: float,
threads: int,
):
"""Generate contact sheets for videos or folders of videos."""

vids = []
for p in inputs:
if p.is_dir():
vids.extend(list(p.rglob("*.mp4")))
else:
vids.append(p)
if not vids:
click.echo("No videos found.", err=True)
raise SystemExit(1)

click.echo(f"Processing {len(vids)} video(s)…")
task = lambda v: video_to_contact_sheet(
v, out_dir, max_frames=max_frames, cols=cols, scene_thresh=scene_thresh
)
for _ in tqdm(parallel_map(task, vids, max_workers=threads), total=len(vids)):
pass
click.echo(f"Done! Saved to {out_dir}")


if __name__ == "__main__":
main()
115 changes: 115 additions & 0 deletions video_contact_sheet/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""
Core logic: Keyframe extraction, contact table splicing.
"""
from __future__ import annotations

import math
import cv2
import numpy as np
from pathlib import Path
from PIL import Image, ImageDraw, ImageFont
from typing import List, Dict, Tuple

from .utils import ffprobe_metadata


def extract_keyframes(
video_path: Path, max_frames: int = 15, scene_thresh: float = 30.0
) -> List[np.ndarray]:
"""
Return up to *max_frames* key frames (BGR ndarray).
Use HSV histogram difference for simple scene change detection.
"""
cap = cv2.VideoCapture(str(video_path))
if not cap.isOpened():
raise RuntimeError(f"Cannot open {video_path}")

frames: List[np.ndarray] = []
prev_hist = None

while len(frames) < max_frames:
ok, frame = cap.read()
if not ok:
break
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
hist = cv2.calcHist([hsv], [0, 1], None, [50, 60], [0, 180, 0, 256])
hist = cv2.normalize(hist, hist).flatten()

if prev_hist is None:
frames.append(frame)
else:
diff = cv2.compareHist(prev_hist, hist, cv2.HISTCMP_BHATTACHARYYA)
if diff * 100 > scene_thresh: # scale for intuition
frames.append(frame)
prev_hist = hist

cap.release()
if not frames:
raise RuntimeError("No frames extracted")
return frames


FONT = ImageFont.load_default()


def make_contact_sheet(
frames: List[np.ndarray],
metadata: Dict,
cols: int = 5,
margin: int = 8,
) -> Image.Image:

rows = math.ceil(len(frames) / cols)
h, w, _ = frames[0].shape
sheet_w = w * cols + margin * (cols + 1)
sheet_h = h * rows + margin * (rows + 1) + 60
canvas = Image.new("RGB", (sheet_w, sheet_h), "black")

for idx, frame in enumerate(frames):
r = idx // cols
c = idx % cols
x = margin + c * (w + margin)
y = margin + r * (h + margin)
canvas.paste(Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)), (x, y))

draw = ImageDraw.Draw(canvas)
text = (
f"{metadata['file']} | {metadata['duration']:.1f}s "
f"| {metadata['width']}x{metadata['height']} | {metadata['codec']}"
)
bbox = draw.textbbox((0, 0), text, font=FONT)
tw = bbox[2] - bbox[0]
th = bbox[3] - bbox[1]
draw.text(((sheet_w - tw) // 2, sheet_h - th - 10), text, fill="white", font=FONT)
return canvas


def video_to_contact_sheet(
video_path: Path,
out_dir: Path,
max_frames: int = 15,
cols: int = 5,
scene_thresh: float = 30.0,
quality: int = 85,
) -> Path:

frames = extract_keyframes(video_path, max_frames=max_frames, scene_thresh=scene_thresh)
meta = _collect_meta(video_path)
sheet = make_contact_sheet(frames, meta, cols=cols)

out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / f"{video_path.stem}_sheet.jpg"
sheet.save(out_path, "JPEG", quality=quality)
return out_path


def _collect_meta(video_path: Path) -> Dict:
info = ffprobe_metadata(video_path)
v_stream = next(s for s in info["streams"] if s["codec_type"] == "video")
return dict(
file=video_path.name,
duration=float(info["format"]["duration"]),
width=int(v_stream["width"]),
height=int(v_stream["height"]),
codec=v_stream["codec_name"],
)
6 changes: 6 additions & 0 deletions video_contact_sheet/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
opencv-python>=4.9
ffmpeg-python>=0.2
Pillow>=10.3
tqdm>=4.66
click>=8.1
pytest>=8.2
35 changes: 35 additions & 0 deletions video_contact_sheet/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""
Shared utilities: ffprobe metadata, parallel helpers, paths.
"""
from __future__ import annotations

import json
import subprocess
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Any, Dict, Iterable, List

FFPROBE_CMD = [
"ffprobe",
"-v",
"error",
"-print_format",
"json",
"-show_format",
"-show_streams",
]


def ffprobe_metadata(path: Path) -> Dict[str, Any]:
"""Return ffprobe JSON metadata for *path*."""
proc = subprocess.run(
FFPROBE_CMD + [str(path)], capture_output=True, text=True, check=True
)
return json.loads(proc.stdout)


def parallel_map(func, iterable: Iterable, max_workers: int = 4) -> List:
"""Lightweight ThreadPool map that preserves order."""
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = [pool.submit(func, item) for item in iterable]
return [f.result() for f in futures]