Newer
Older
ivus-complication-annotation-tool / app.py
"""
IVUS Complication Annotation Tool
Streamlit-based application for annotating IVUS images with complication predictions.
"""

import streamlit as st
from PIL import Image
import os
from typing import Optional, Union

# Tkinter import for folder/file dialog (Windows native dialog)
try:
    import tkinter as tk
    from tkinter import filedialog
    TKINTER_AVAILABLE = True
except ImportError:
    TKINTER_AVAILABLE = False

from utils import (
    get_all_case_ids,
    get_case_images,
    load_ground_truth_labels,
    validate_case_directory,
    save_annotation,
    get_annotated_cases
)

# UI Constants
PREDICTION_OPTIONS = ["あり", "なし"]  # 「あり」を上に
REASONS_OPTIONS = [
    "石灰化プラークが多い",
    "石灰化プラークが少ない",
    "減衰プラークが多い",
    "減衰プラークが少ない",
    "その他(記述)"
]

MIN_CONFIDENCE = 0
MAX_CONFIDENCE = 100
DEFAULT_CONFIDENCE = 50

# Image display settings
DEFAULT_IMAGE_WIDTH = 600  # Default image width in pixels (デフォルトより少し縮小)

# Page configuration
st.set_page_config(
    page_title="IVUS合併症アノテーションツール",
    page_icon="🫀",
    layout="wide"
)


def open_folder_dialog(initial_dir: str = ".") -> str:
    """
    Windows標準のフォルダ選択ダイアログを開き、選択されたパスを返す。

    Args:
        initial_dir: 初期表示ディレクトリ

    Returns:
        選択されたフォルダパス(キャンセル時は空文字列)

    Note:
        WSL環境では動作しない可能性があります。Windows環境での使用を想定。
    """
    if not TKINTER_AVAILABLE:
        return ""

    try:
        root = tk.Tk()
        root.withdraw()  # メインウィンドウを隠す
        root.wm_attributes('-topmost', 1)  # ダイアログを最前面に表示
        folder_path = filedialog.askdirectory(initialdir=initial_dir)
        root.destroy()
        return folder_path if folder_path else ""
    except Exception as e:
        print(f"フォルダ選択ダイアログエラー: {e}")
        return ""


def open_file_dialog(initial_dir: str = ".", file_types=None) -> str:
    """
    Windows標準のファイル選択ダイアログを開き、選択されたパスを返す。

    Args:
        initial_dir: 初期表示ディレクトリ
        file_types: ファイル種類のフィルタ

    Returns:
        選択されたファイルパス(キャンセル時は空文字列)

    Note:
        WSL環境では動作しない可能性があります。Windows環境での使用を想定。
    """
    if not TKINTER_AVAILABLE:
        return ""

    if file_types is None:
        file_types = [("Excel files", "*.xlsx;*.xls"), ("All files", "*.*")]

    try:
        root = tk.Tk()
        root.withdraw()
        root.wm_attributes('-topmost', 1)
        file_path = filedialog.askopenfilename(
            initialdir=initial_dir,
            filetypes=file_types
        )
        root.destroy()
        return file_path if file_path else ""
    except Exception as e:
        print(f"ファイル選択ダイアログエラー: {e}")
        return ""


def render_configuration_form():
    """
    起動時の設定フォームを表示。
    アノテーター名とデータパスを入力してから進む。
    フォルダ/ファイル選択ボタンも提供。
    """
    st.title("IVUS合併症アノテーションツール - 設定")
    st.markdown("---")

    # Initialize session state for paths if not exists (空の初期値)
    if 'temp_data_root' not in st.session_state:
        st.session_state.temp_data_root = ""
    if 'temp_excel_path' not in st.session_state:
        st.session_state.temp_excel_path = ""

    st.subheader("設定")

    # Annotator name input
    annotator = st.text_input(
        "アノテーター名 *",
        value="",
        help="あなたの名前を入力してください(必須)",
        placeholder="例: 田中"
    )

    st.markdown("---")

    # Data Directory Path with folder picker
    st.markdown("**データディレクトリパス** *")
    st.caption("症例フォルダ(CHIBAMI_xxx_pre)が含まれるディレクトリを選択してください")

    col1, col2 = st.columns([4, 1])
    with col1:
        data_root = st.text_input(
            "データディレクトリパス",
            value=st.session_state.temp_data_root,
            help="「📁 参照」ボタンでフォルダを選択してください",
            placeholder="「📁 参照」ボタンでフォルダを選択",
            label_visibility="collapsed"
        )
        # Update session state when user types
        if data_root != st.session_state.temp_data_root:
            st.session_state.temp_data_root = data_root

    with col2:
        if st.button("📁 参照", key="btn_browse_data", use_container_width=True):
            if TKINTER_AVAILABLE:
                # Use current path as initial dir if exists, otherwise use home directory
                initial = st.session_state.temp_data_root if st.session_state.temp_data_root else os.path.expanduser("~")
                selected_path = open_folder_dialog(initial_dir=initial)
                if selected_path:
                    st.session_state.temp_data_root = selected_path
                    st.rerun()
            else:
                st.warning("フォルダ選択機能はWindows環境でのみ利用可能です")

    # Show current path validation
    if data_root and os.path.exists(data_root):
        st.success(f"✓ フォルダが見つかりました: {data_root}")
    elif data_root:
        st.error(f"✗ フォルダが見つかりません: {data_root}")

    st.markdown("---")

    # Excel Label Path with file picker
    st.markdown("**Excelラベルファイルパス** *")
    st.caption("正解ラベルが記載されたExcelファイル(CHIBAMI_case_list_xxx.xlsx)を選択してください")

    col1, col2 = st.columns([4, 1])
    with col1:
        excel_path = st.text_input(
            "Excelラベルファイルパス",
            value=st.session_state.temp_excel_path,
            help="「📄 参照」ボタンでExcelファイルを選択してください",
            placeholder="「📄 参照」ボタンでファイルを選択",
            label_visibility="collapsed"
        )
        # Update session state when user types
        if excel_path != st.session_state.temp_excel_path:
            st.session_state.temp_excel_path = excel_path

    with col2:
        if st.button("📄 参照", key="btn_browse_excel", use_container_width=True):
            if TKINTER_AVAILABLE:
                # Use directory of current path as initial dir if exists
                initial = os.path.dirname(st.session_state.temp_excel_path) if st.session_state.temp_excel_path else os.path.expanduser("~")
                selected_path = open_file_dialog(initial_dir=initial)
                if selected_path:
                    st.session_state.temp_excel_path = selected_path
                    st.rerun()
            else:
                st.warning("ファイル選択機能はWindows環境でのみ利用可能です")

    # Show current file validation
    if excel_path and os.path.exists(excel_path):
        st.success(f"✓ ファイルが見つかりました: {excel_path}")
    elif excel_path:
        st.error(f"✗ ファイルが見つかりません: {excel_path}")

    st.markdown("---")

    # Submit button
    if st.button("アノテーション開始", type="primary", use_container_width=True):
        # Validation
        errors = []
        if not annotator or annotator.strip() == "":
            errors.append("❌ アノテーター名は必須です")
        if not data_root or data_root.strip() == "":
            errors.append("❌ データディレクトリパスは必須です - 「📁 参照」ボタンでフォルダを選択してください")
        elif not os.path.exists(data_root):
            errors.append(f"❌ データディレクトリが見つかりません: {data_root}")
        if not excel_path or excel_path.strip() == "":
            errors.append("❌ Excelラベルファイルパスは必須です - 「📄 参照」ボタンでファイルを選択してください")
        elif not os.path.exists(excel_path):
            errors.append(f"❌ Excelファイルが見つかりません: {excel_path}")

        if errors:
            for error in errors:
                st.error(error)
        else:
            # Save configuration to session state
            st.session_state.config_complete = True
            st.session_state.annotator = annotator.strip()
            st.session_state.data_root = data_root.strip()
            st.session_state.excel_path = excel_path.strip()
            st.session_state.csv_path = os.path.join(
                data_root.strip(),
                f"annotations_{annotator.strip()}.csv"
            )
            st.rerun()


def initialize_session_state():
    """設定完了後、セッション状態を初期化。"""
    if 'config_complete' not in st.session_state:
        st.session_state.config_complete = False

    if not st.session_state.config_complete:
        return

    # Load case IDs
    if 'case_ids' not in st.session_state:
        st.session_state.case_ids = get_all_case_ids(st.session_state.data_root)

    # Initialize current case index
    if 'current_case_idx' not in st.session_state:
        st.session_state.current_case_idx = 0

    # Load ground truth labels (not displayed, only for CSV saving)
    if 'ground_truth_labels' not in st.session_state:
        st.session_state.ground_truth_labels = load_ground_truth_labels(
            st.session_state.excel_path
        )

    # Load annotated cases
    if 'annotated_cases' not in st.session_state:
        st.session_state.annotated_cases = get_annotated_cases(
            st.session_state.csv_path,
            st.session_state.annotator
        )


def get_current_case_id() -> Optional[Union[int, float]]:
    """現在の症例IDを取得。"""
    if (st.session_state.current_case_idx >= 0 and
            st.session_state.current_case_idx < len(st.session_state.case_ids)):
        return st.session_state.case_ids[st.session_state.current_case_idx]
    return None


def render_header():
    """進捗情報を含むヘッダーを表示。"""
    st.title("IVUS合併症アノテーションツール")

    total_cases = len(st.session_state.case_ids)
    annotated_count = len(st.session_state.annotated_cases)

    col1, col2, col3, col4 = st.columns(4)
    with col1:
        st.metric("アノテーター", st.session_state.annotator)
    with col2:
        st.metric("全症例数", total_cases)
    with col3:
        st.metric("完了数", annotated_count)
    with col4:
        progress = annotated_count / total_cases if total_cases > 0 else 0
        st.metric("進捗", f"{progress:.1%}")


def render_case_selector(container):
    """症例ナビゲーションを表示(右側に配置)。

    Args:
        container: Streamlit container to render in
    """
    container.header("症例ナビゲーション")

    current_case_id = get_current_case_id()

    # Case selector dropdown
    case_display = [
        f"症例 {cid}" + (" ✓" if cid in st.session_state.annotated_cases else "")
        for cid in st.session_state.case_ids
    ]

    selected_idx = container.selectbox(
        "症例を選択",
        range(len(st.session_state.case_ids)),
        index=st.session_state.current_case_idx,
        format_func=lambda i: case_display[i]
    )

    if selected_idx != st.session_state.current_case_idx:
        st.session_state.current_case_idx = selected_idx
        st.rerun()

    # Previous/Next buttons
    col1, col2 = container.columns(2)
    with col1:
        if st.button("← 前へ", disabled=st.session_state.current_case_idx == 0):
            st.session_state.current_case_idx -= 1
            st.rerun()
    with col2:
        if st.button(
            "次へ →",
            disabled=st.session_state.current_case_idx >= len(st.session_state.case_ids) - 1
        ):
            st.session_state.current_case_idx += 1
            st.rerun()

    # Display current case info
    if current_case_id:
        container.info(f"**現在の症例:** {current_case_id}")


def render_image_viewer(case_id: Union[int, float], container):
    """
    スライダー付き画像ビューアーを表示。

    Args:
        case_id: 現在の症例ID
        container: Streamlit container to render in
    """
    images = get_case_images(st.session_state.data_root, case_id)

    if len(images) == 0:
        container.error(f"症例 {case_id} の画像が見つかりません")
        return

    # Initialize frame index in session state if not exists
    frame_key = f"frame_idx_{case_id}"
    if frame_key not in st.session_state:
        st.session_state[frame_key] = 0

    # Image zoom slider (拡大率の調整)
    zoom_percent = container.slider(
        "画像拡大率 (%)",
        min_value=30,
        max_value=150,
        value=80,  # デフォルト値を少し縮小
        step=10,
        key=f"zoom_{case_id}"
    )

    # Image slider
    frame_idx = container.slider(
        "フレーム番号",
        min_value=0,
        max_value=len(images) - 1,
        value=st.session_state[frame_key],
        key=f"frame_slider_{case_id}"
    )

    # Update session state
    st.session_state[frame_key] = frame_idx

    # Display image with custom width
    try:
        img = Image.open(images[frame_idx])

        # Calculate display width based on zoom percentage
        display_width = int(DEFAULT_IMAGE_WIDTH * zoom_percent / 100)

        container.image(
            img,
            caption=f"症例 {case_id} - フレーム {frame_idx + 1}/{len(images)}",
            width=display_width
        )

        # Arrow buttons for frame navigation (画像の右下に配置)
        col1, col2, col3 = container.columns([4, 1, 1])
        with col1:
            st.write("")  # Spacer
        with col2:
            if st.button("◀", key=f"prev_frame_{case_id}", disabled=frame_idx == 0, use_container_width=True):
                st.session_state[frame_key] = frame_idx - 1
                st.rerun()
        with col3:
            if st.button("▶", key=f"next_frame_{case_id}", disabled=frame_idx >= len(images) - 1, use_container_width=True):
                st.session_state[frame_key] = frame_idx + 1
                st.rerun()

    except Exception as e:
        container.error(f"画像読み込みエラー: {e}")

    # Keyboard navigation support (キーボードの矢印キーでフレーム移動)
    container.caption("💡 ヒント: スライダーをクリックしてから矢印キー(←→)でフレーム移動、または◀▶ボタンをクリックできます")


def render_annotation_form(case_id: Union[int, float]) -> dict:
    """
    アノテーション入力フォームを表示。

    Args:
        case_id: 現在の症例ID

    Returns:
        アノテーションデータの辞書
    """
    st.sidebar.markdown("---")
    st.sidebar.header("アノテーション")

    # Display annotator name (read-only)
    st.sidebar.text(f"アノテーター: {st.session_state.annotator}")
    st.sidebar.markdown("---")

    # Q1: Prediction
    prediction = st.sidebar.radio(
        "Q1: 合併症予測 (No reflow/Slow flow)",
        PREDICTION_OPTIONS,
        key=f"prediction_{case_id}"
    )

    # Q2: Confidence
    confidence = st.sidebar.slider(
        "Q2: 確信度 (%)",
        min_value=MIN_CONFIDENCE,
        max_value=MAX_CONFIDENCE,
        value=DEFAULT_CONFIDENCE,
        key=f"confidence_{case_id}"
    )

    # Q3: Reasons
    st.sidebar.markdown("**Q3: 判断根拠** (複数選択可)")
    reasons = []
    for reason in REASONS_OPTIONS:
        if st.sidebar.checkbox(reason, key=f"reason_{case_id}_{reason}"):
            reasons.append(reason)

    # Q4: Free text comment
    comment = st.sidebar.text_area(
        "Q4: 自由記述欄",
        value="",
        key=f"comment_{case_id}",
        height=100
    )

    return {
        'prediction': prediction,
        'confidence': confidence,
        'reasons': reasons,
        'comment': comment
    }


def validate_annotation(annotation_data: dict) -> tuple[bool, str]:
    """
    保存前にアノテーションデータを検証。

    Args:
        annotation_data: アノテーションフィールドの辞書

    Returns:
        (有効かどうか, エラーメッセージ) のタプル
    """
    if len(annotation_data['reasons']) == 0:
        return False, "判断根拠を最低1つ選択してください (Q3)"

    return True, ""


def save_and_advance(case_id: Union[int, float], annotation_data: dict):
    """
    アノテーションを保存して次の症例へ進む。

    Args:
        case_id: 現在の症例ID
        annotation_data: アノテーションフィールドの辞書
    """
    is_valid, error_msg = validate_annotation(annotation_data)

    if not is_valid:
        st.sidebar.error(error_msg)
        return

    try:
        # Get ground truth label for this case (may be None)
        ground_truth = st.session_state.ground_truth_labels.get(case_id, None)

        save_annotation(
            csv_path=st.session_state.csv_path,
            case_id=case_id,
            prediction=annotation_data['prediction'],
            confidence=annotation_data['confidence'],
            reasons=annotation_data['reasons'],
            comment=annotation_data['comment'],
            annotator=st.session_state.annotator,
            ground_truth=ground_truth
        )

        # Update annotated cases
        st.session_state.annotated_cases.add(case_id)

        # Show success message
        st.sidebar.success(f"✓ 症例 {case_id} を保存しました!")

        # Auto-advance to next unannotated case
        next_idx = st.session_state.current_case_idx + 1
        while next_idx < len(st.session_state.case_ids):
            next_case_id = st.session_state.case_ids[next_idx]
            if next_case_id not in st.session_state.annotated_cases:
                st.session_state.current_case_idx = next_idx
                st.rerun()
                return
            next_idx += 1

        # If no more unannotated cases, stay on current or show completion
        if next_idx < len(st.session_state.case_ids):
            st.session_state.current_case_idx = next_idx
            st.rerun()
        else:
            st.sidebar.info("全症例のアノテーションが完了しました! 🎉")

    except Exception as e:
        st.sidebar.error(f"保存エラー: {e}")


def main():
    """メインアプリケーションエントリーポイント。"""
    initialize_session_state()

    # Show configuration form if not completed
    if not st.session_state.config_complete:
        render_configuration_form()
        return

    # Check if there are any cases
    if len(st.session_state.case_ids) == 0:
        st.error(f"{st.session_state.data_root} に症例が見つかりません")
        st.info("アプリケーションを再起動して、データディレクトリパスを確認してください。")
        return

    render_header()

    current_case_id = get_current_case_id()

    if current_case_id is None:
        st.error("利用可能な症例がありません")
        return

    # Validate case directory
    is_valid, error_msg = validate_case_directory(
        st.session_state.data_root,
        current_case_id
    )
    if not is_valid:
        st.error(error_msg)
        return

    # Main layout: Left - Image viewer, Right - Navigation + Annotation
    left_col, right_col = st.columns([3, 1])

    # Left side: Image viewer
    with left_col:
        render_image_viewer(current_case_id, left_col)

    # Right side: Navigation and Annotation
    with right_col:
        render_case_selector(right_col)
        st.markdown("---")

    # Annotation form in sidebar (keeps original position)
    annotation_data = render_annotation_form(current_case_id)

    # Save button
    if st.sidebar.button("💾 保存して次へ", type="primary", use_container_width=True):
        save_and_advance(current_case_id, annotation_data)


if __name__ == "__main__":
    main()