Newer
Older
RARP / py_ffprobe.py
@delAguila delAguila 27 days ago 5 KB Final Commit.
import argparse
import csv
import json
import os
import subprocess
from pathlib import Path
from tqdm import tqdm

VIDEO_EXTS = {".mp4", ".mov", ".m4v", ".mkv", ".avi", ".webm", ".mpg", ".mpeg", ".ts"}

def run_ffprobe(ffprobe_path: str, video_path: Path) -> dict | None:
    """
    Returns a dict with codec_name, r_frame_rate, avg_frame_rate, time_base, duration, format_name.
    Uses JSON output for robust parsing.
    """
    cmd = [
        ffprobe_path,
        "-hide_banner",
        "-v", "error",
        "-select_streams", "v:0",
        "-show_entries",
        "stream=codec_name,r_frame_rate,avg_frame_rate,time_base",
        "-show_entries",
        "format=duration,format_name",
        "-of", "json",
        str(video_path),
    ]
    try:
        out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, text=True)
        data = json.loads(out)
        stream = (data.get("streams") or [{}])[0]
        fmt = data.get("format") or {}
        return {
            "codec_name": stream.get("codec_name", ""),
            "r_frame_rate": stream.get("r_frame_rate", ""),
            "avg_frame_rate": stream.get("avg_frame_rate", ""),
            "time_base": stream.get("time_base", ""),
            "format_name": fmt.get("format_name", ""),
            "duration": fmt.get("duration", ""),
        }
    except subprocess.CalledProcessError as e:
        return {"error": e.output.strip()}
    except Exception as e:
        return {"error": str(e)}

def frac_to_float(frac: str) -> float | None:
    """Convert '30000/1001' to float. Returns None if empty/invalid."""
    if not frac or frac == "0/0":
        return None
    try:
        num, den = frac.split("/")
        num_f = float(num)
        den_f = float(den)
        if den_f == 0:
            return None
        return num_f / den_f
    except Exception:
        return None

def is_vfr(r_rate: str, avg_rate: str, tol: float = 1e-6) -> bool | None:
    """
    Heuristic: if r_frame_rate and avg_frame_rate differ, likely VFR.
    Returns None if either rate missing.
    """
    r = frac_to_float(r_rate)
    a = frac_to_float(avg_rate)
    if r is None or a is None:
        return None
    return abs(r - a) > tol

def bucket_fps(fps: float | None) -> str:
    """Bucket fps into 24/30/60/other (rough) for summary."""
    if fps is None:
        return "unknown"
    # handle 23.976, 24.0
    if abs(fps - 24.0) < 0.2 or abs(fps - 23.976) < 0.2:
        return "≈24"
    if abs(fps - 30.0) < 0.2 or abs(fps - 29.97) < 0.2:
        return "≈30"
    if abs(fps - 60.0) < 0.3 or abs(fps - 59.94) < 0.3:
        return "≈60"
    return "other"

def iter_videos(root: Path):
    for p in root.rglob("*"):
        if p.is_file() and p.suffix.lower() in VIDEO_EXTS:
            yield p

def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("input_dir", help="Directory containing videos")
    ap.add_argument("--ffprobe", default="ffprobe", help="Path to ffprobe (default: ffprobe)")
    ap.add_argument("--out", default="ffprobe_report.csv", help="Output CSV path")
    args = ap.parse_args()

    root = Path(args.input_dir).expanduser().resolve()
    out_csv = Path(args.out).expanduser().resolve()

    rows = []
    summary = {"≈24": 0, "≈30": 0, "≈60": 0, "other": 0, "unknown": 0}
    vfr_count = 0
    cfr_count = 0
    err_count = 0

    for vid in tqdm(iter_videos(root)):
        info = run_ffprobe(args.ffprobe, vid)
        row = {
            "path": str(vid),
            "codec_name": "",
            "format_name": "",
            "duration_sec": "",
            "r_frame_rate": "",
            "avg_frame_rate": "",
            "r_fps": "",
            "avg_fps": "",
            "vfr": "",
            "time_base": "",
            "error": "",
            "fps_bucket": "",
        }

        if info is None or "error" in info:
            row["error"] = (info or {}).get("error", "unknown error")
            err_count += 1
            row["fps_bucket"] = "unknown"
            summary["unknown"] += 1
            rows.append(row)
            continue

        row["codec_name"] = info["codec_name"]
        row["format_name"] = info["format_name"]
        row["duration_sec"] = info["duration"]
        row["r_frame_rate"] = info["r_frame_rate"]
        row["avg_frame_rate"] = info["avg_frame_rate"]
        row["time_base"] = info["time_base"]

        r_fps = frac_to_float(info["r_frame_rate"])
        a_fps = frac_to_float(info["avg_frame_rate"])
        row["r_fps"] = f"{r_fps:.6f}" if r_fps is not None else ""
        row["avg_fps"] = f"{a_fps:.6f}" if a_fps is not None else ""

        vfr = is_vfr(info["r_frame_rate"], info["avg_frame_rate"])
        if vfr is None:
            row["vfr"] = ""
        else:
            row["vfr"] = "1" if vfr else "0"
            if vfr:
                vfr_count += 1
            else:
                cfr_count += 1

        # bucket using avg fps if available, else r fps
        fps_for_bucket = a_fps if a_fps is not None else r_fps
        b = bucket_fps(fps_for_bucket)
        row["fps_bucket"] = b
        summary[b] += 1

        rows.append(row)

    # Write CSV
    out_csv.parent.mkdir(parents=True, exist_ok=True)
    fieldnames = [
        "path", "codec_name", "format_name", "duration_sec",
        "r_frame_rate", "avg_frame_rate", "r_fps", "avg_fps",
        "vfr", "time_base", "fps_bucket", "error"
    ]
    with out_csv.open("w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=fieldnames)
        w.writeheader()
        w.writerows(rows)

    total = len(rows)
    print(f"Wrote: {out_csv}")
    print(f"Total files: {total}")
    print(f"Errors: {err_count}")
    print(f"CFR (heuristic): {cfr_count} | VFR (heuristic): {vfr_count}")
    print("FPS buckets:")
    for k in ["≈24", "≈30", "≈60", "other", "unknown"]:
        print(f"  {k}: {summary[k]}")

if __name__ == "__main__":
    main()