import argparse
import csv
import json
import os
import subprocess
from pathlib import Path
from tqdm import tqdm
VIDEO_EXTS = {".mp4", ".mov", ".m4v", ".mkv", ".avi", ".webm", ".mpg", ".mpeg", ".ts"}
def run_ffprobe(ffprobe_path: str, video_path: Path) -> dict | None:
"""
Returns a dict with codec_name, r_frame_rate, avg_frame_rate, time_base, duration, format_name.
Uses JSON output for robust parsing.
"""
cmd = [
ffprobe_path,
"-hide_banner",
"-v", "error",
"-select_streams", "v:0",
"-show_entries",
"stream=codec_name,r_frame_rate,avg_frame_rate,time_base",
"-show_entries",
"format=duration,format_name",
"-of", "json",
str(video_path),
]
try:
out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, text=True)
data = json.loads(out)
stream = (data.get("streams") or [{}])[0]
fmt = data.get("format") or {}
return {
"codec_name": stream.get("codec_name", ""),
"r_frame_rate": stream.get("r_frame_rate", ""),
"avg_frame_rate": stream.get("avg_frame_rate", ""),
"time_base": stream.get("time_base", ""),
"format_name": fmt.get("format_name", ""),
"duration": fmt.get("duration", ""),
}
except subprocess.CalledProcessError as e:
return {"error": e.output.strip()}
except Exception as e:
return {"error": str(e)}
def frac_to_float(frac: str) -> float | None:
"""Convert '30000/1001' to float. Returns None if empty/invalid."""
if not frac or frac == "0/0":
return None
try:
num, den = frac.split("/")
num_f = float(num)
den_f = float(den)
if den_f == 0:
return None
return num_f / den_f
except Exception:
return None
def is_vfr(r_rate: str, avg_rate: str, tol: float = 1e-6) -> bool | None:
"""
Heuristic: if r_frame_rate and avg_frame_rate differ, likely VFR.
Returns None if either rate missing.
"""
r = frac_to_float(r_rate)
a = frac_to_float(avg_rate)
if r is None or a is None:
return None
return abs(r - a) > tol
def bucket_fps(fps: float | None) -> str:
"""Bucket fps into 24/30/60/other (rough) for summary."""
if fps is None:
return "unknown"
# handle 23.976, 24.0
if abs(fps - 24.0) < 0.2 or abs(fps - 23.976) < 0.2:
return "≈24"
if abs(fps - 30.0) < 0.2 or abs(fps - 29.97) < 0.2:
return "≈30"
if abs(fps - 60.0) < 0.3 or abs(fps - 59.94) < 0.3:
return "≈60"
return "other"
def iter_videos(root: Path):
for p in root.rglob("*"):
if p.is_file() and p.suffix.lower() in VIDEO_EXTS:
yield p
def main():
ap = argparse.ArgumentParser()
ap.add_argument("input_dir", help="Directory containing videos")
ap.add_argument("--ffprobe", default="ffprobe", help="Path to ffprobe (default: ffprobe)")
ap.add_argument("--out", default="ffprobe_report.csv", help="Output CSV path")
args = ap.parse_args()
root = Path(args.input_dir).expanduser().resolve()
out_csv = Path(args.out).expanduser().resolve()
rows = []
summary = {"≈24": 0, "≈30": 0, "≈60": 0, "other": 0, "unknown": 0}
vfr_count = 0
cfr_count = 0
err_count = 0
for vid in tqdm(iter_videos(root)):
info = run_ffprobe(args.ffprobe, vid)
row = {
"path": str(vid),
"codec_name": "",
"format_name": "",
"duration_sec": "",
"r_frame_rate": "",
"avg_frame_rate": "",
"r_fps": "",
"avg_fps": "",
"vfr": "",
"time_base": "",
"error": "",
"fps_bucket": "",
}
if info is None or "error" in info:
row["error"] = (info or {}).get("error", "unknown error")
err_count += 1
row["fps_bucket"] = "unknown"
summary["unknown"] += 1
rows.append(row)
continue
row["codec_name"] = info["codec_name"]
row["format_name"] = info["format_name"]
row["duration_sec"] = info["duration"]
row["r_frame_rate"] = info["r_frame_rate"]
row["avg_frame_rate"] = info["avg_frame_rate"]
row["time_base"] = info["time_base"]
r_fps = frac_to_float(info["r_frame_rate"])
a_fps = frac_to_float(info["avg_frame_rate"])
row["r_fps"] = f"{r_fps:.6f}" if r_fps is not None else ""
row["avg_fps"] = f"{a_fps:.6f}" if a_fps is not None else ""
vfr = is_vfr(info["r_frame_rate"], info["avg_frame_rate"])
if vfr is None:
row["vfr"] = ""
else:
row["vfr"] = "1" if vfr else "0"
if vfr:
vfr_count += 1
else:
cfr_count += 1
# bucket using avg fps if available, else r fps
fps_for_bucket = a_fps if a_fps is not None else r_fps
b = bucket_fps(fps_for_bucket)
row["fps_bucket"] = b
summary[b] += 1
rows.append(row)
# Write CSV
out_csv.parent.mkdir(parents=True, exist_ok=True)
fieldnames = [
"path", "codec_name", "format_name", "duration_sec",
"r_frame_rate", "avg_frame_rate", "r_fps", "avg_fps",
"vfr", "time_base", "fps_bucket", "error"
]
with out_csv.open("w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=fieldnames)
w.writeheader()
w.writerows(rows)
total = len(rows)
print(f"Wrote: {out_csv}")
print(f"Total files: {total}")
print(f"Errors: {err_count}")
print(f"CFR (heuristic): {cfr_count} | VFR (heuristic): {vfr_count}")
print("FPS buckets:")
for k in ["≈24", "≈30", "≈60", "other", "unknown"]:
print(f" {k}: {summary[k]}")
if __name__ == "__main__":
main()