Newer
Older
ivus-complication-annotation-tool / app.py
"""
IVUS Complication Annotation Tool
Streamlit-based application for annotating IVUS images with complication predictions.
"""

import streamlit as st
import streamlit.components.v1 as components
from PIL import Image
import os
from typing import Optional, Union

# Tkinter import for folder/file dialog (Windows native dialog)
try:
    import tkinter as tk
    from tkinter import filedialog
    TKINTER_AVAILABLE = True
except ImportError:
    TKINTER_AVAILABLE = False

from utils import (
    get_all_case_ids,
    get_case_images,
    load_ground_truth_labels,
    validate_case_directory,
    save_annotation,
    get_annotated_cases
)

# UI Constants
PREDICTION_OPTIONS = ["あり", "なし"]  # 「あり」を上に
REASONS_OPTIONS = [
    "石灰化プラークが多い",
    "石灰化プラークが少ない",
    "減衰プラークが多い",
    "減衰プラークが少ない",
    "その他(記述)"
]

MIN_CONFIDENCE = 0
MAX_CONFIDENCE = 100
DEFAULT_CONFIDENCE = 50
CONFIDENCE_OPTIONS = [0, 25, 50, 75, 100]  # 5段階の確信度

# Image display settings
DEFAULT_IMAGE_WIDTH = 600  # Default image width in pixels (デフォルトより少し縮小)

# Page configuration
st.set_page_config(
    page_title="IVUS合併症アノテーションツール",
    page_icon="🫀",
    layout="wide",
    initial_sidebar_state="expanded"  # Hide sidebar to prevent scrolling
)


def inject_keyboard_shortcuts():
    """
    JavaScriptを入れて,キーボードの左右矢印キーで画面上の「◀」「▶」ボタンをプログラム的にクリックさせる
    """
    js_code = """
    <script>
    (function() {
        const doc = window.parent.document;
        
        // すでにリスナーが登録されている場合は何もしない(重複防止)
        if (window.parent._hasIVUSShortcuts) {
            return;
        }
        window.parent._hasIVUSShortcuts = true;

        doc.addEventListener('keydown', function(e) {
            const active = doc.activeElement;
            
            // 【重要】テキスト入力中のみ除外するロジックを修正
            // TEXTAREA は無条件で除外(コメント入力など)
            // INPUT は type="range" (スライダー) 以外の場合のみ除外(名前入力など)
            if (active.tagName === 'TEXTAREA') {
                return;
            }
            if (active.tagName === 'INPUT' && active.type !== 'range') {
                return;
            }

            const buttons = Array.from(doc.querySelectorAll('button'));

            if (e.key === 'ArrowLeft') {
                const prevBtn = buttons.find(b => b.innerText.includes('◀'));
                if (prevBtn) {
                    // ブラウザのデフォルト動作(スライダー移動など)をキャンセル
                    e.preventDefault();
                    e.stopPropagation();
                    prevBtn.click();
                }
            } else if (e.key === 'ArrowRight') {
                const nextBtn = buttons.find(b => b.innerText.includes('▶'));
                if (nextBtn) {
                    e.preventDefault();
                    e.stopPropagation();
                    nextBtn.click();
                }
            }
        });
    })();
    </script>
    """
    components.html(js_code, height=0)

def open_folder_dialog(initial_dir: str = ".") -> str:
    """
    Windows標準のフォルダ選択ダイアログを開き、選択されたパスを返す。

    Args:
        initial_dir: 初期表示ディレクトリ

    Returns:
        選択されたフォルダパス(キャンセル時は空文字列)

    Note:
        WSL環境では動作しない可能性があります。Windows環境での使用を想定。
    """
    if not TKINTER_AVAILABLE:
        return ""

    try:
        root = tk.Tk()
        root.withdraw()  # メインウィンドウを隠す
        root.wm_attributes('-topmost', 1)  # ダイアログを最前面に表示
        folder_path = filedialog.askdirectory(initialdir=initial_dir)
        root.destroy()
        return folder_path if folder_path else ""
    except Exception as e:
        print(f"フォルダ選択ダイアログエラー: {e}")
        return ""


def open_file_dialog(initial_dir: str = ".", file_types=None) -> str:
    """
    Windows標準のファイル選択ダイアログを開き、選択されたパスを返す。

    Args:
        initial_dir: 初期表示ディレクトリ
        file_types: ファイル種類のフィルタ

    Returns:
        選択されたファイルパス(キャンセル時は空文字列)

    Note:
        WSL環境では動作しない可能性があります。Windows環境での使用を想定。
    """
    if not TKINTER_AVAILABLE:
        return ""

    if file_types is None:
        file_types = [("Excel files", "*.xlsx;*.xls"), ("All files", "*.*")]

    try:
        root = tk.Tk()
        root.withdraw()
        root.wm_attributes('-topmost', 1)
        file_path = filedialog.askopenfilename(
            initialdir=initial_dir,
            filetypes=file_types
        )
        root.destroy()
        return file_path if file_path else ""
    except Exception as e:
        print(f"ファイル選択ダイアログエラー: {e}")
        return ""


def render_configuration_form():
    """
    起動時の設定フォームを表示。
    アノテーター名とデータパスを入力してから進む。
    フォルダ/ファイル選択ボタンも提供。
    """
    st.title("IVUS合併症アノテーションツール - 設定")
    st.markdown("---")

    # Initialize session state for paths if not exists (空の初期値)
    if 'temp_data_root' not in st.session_state:
        st.session_state.temp_data_root = ""
    if 'temp_excel_path' not in st.session_state:
        st.session_state.temp_excel_path = ""

    st.subheader("設定")

    # Annotator name input
    annotator = st.text_input(
        "アノテーター名 *",
        value="",
        help="あなたの名前を入力してください(必須)",
        placeholder="例: 田中"
    )

    st.markdown("---")

    # Data Directory Path with folder picker
    st.markdown("**データディレクトリパス** *")
    st.caption("症例フォルダ(CHIBAMI_xxx_pre)が含まれるディレクトリを選択してください")

    col1, col2 = st.columns([4, 1])
    with col1:
        data_root = st.text_input(
            "データディレクトリパス",
            value=st.session_state.temp_data_root,
            help="「📁 参照」ボタンでフォルダを選択してください",
            placeholder="「📁 参照」ボタンでフォルダを選択",
            label_visibility="collapsed"
        )
        # Update session state when user types
        if data_root != st.session_state.temp_data_root:
            st.session_state.temp_data_root = data_root

    with col2:
        if st.button("📁 参照", key="btn_browse_data", use_container_width=True):
            if TKINTER_AVAILABLE:
                # Use current path as initial dir if exists, otherwise use home directory
                initial = st.session_state.temp_data_root if st.session_state.temp_data_root else os.path.expanduser("~")
                selected_path = open_folder_dialog(initial_dir=initial)
                if selected_path:
                    st.session_state.temp_data_root = selected_path
                    st.rerun()
            else:
                st.warning("フォルダ選択機能はWindows環境でのみ利用可能です")

    # Show current path validation
    if data_root and os.path.exists(data_root):
        st.success(f"✓ フォルダが見つかりました: {data_root}")
    elif data_root:
        st.error(f"✗ フォルダが見つかりません: {data_root}")

    st.markdown("---")

    # Excel Label Path with file picker
    st.markdown("**Excelラベルファイルパス** *")
    st.caption("正解ラベルが記載されたExcelファイル(CHIBAMI_case_list_xxx.xlsx)を選択してください")

    col1, col2 = st.columns([4, 1])
    with col1:
        excel_path = st.text_input(
            "Excelラベルファイルパス",
            value=st.session_state.temp_excel_path,
            help="「📄 参照」ボタンでExcelファイルを選択してください",
            placeholder="「📄 参照」ボタンでファイルを選択",
            label_visibility="collapsed"
        )
        # Update session state when user types
        if excel_path != st.session_state.temp_excel_path:
            st.session_state.temp_excel_path = excel_path

    with col2:
        if st.button("📄 参照", key="btn_browse_excel", use_container_width=True):
            if TKINTER_AVAILABLE:
                # Use directory of current path as initial dir if exists
                initial = os.path.dirname(st.session_state.temp_excel_path) if st.session_state.temp_excel_path else os.path.expanduser("~")
                selected_path = open_file_dialog(initial_dir=initial)
                if selected_path:
                    st.session_state.temp_excel_path = selected_path
                    st.rerun()
            else:
                st.warning("ファイル選択機能はWindows環境でのみ利用可能です")

    # Show current file validation
    if excel_path and os.path.exists(excel_path):
        st.success(f"✓ ファイルが見つかりました: {excel_path}")
    elif excel_path:
        st.error(f"✗ ファイルが見つかりません: {excel_path}")

    st.markdown("---")

    # Submit button
    if st.button("アノテーション開始", type="primary", use_container_width=True):
        # Validation
        errors = []
        if not annotator or annotator.strip() == "":
            errors.append("❌ アノテーター名は必須です")
        if not data_root or data_root.strip() == "":
            errors.append("❌ データディレクトリパスは必須です - 「📁 参照」ボタンでフォルダを選択してください")
        elif not os.path.exists(data_root):
            errors.append(f"❌ データディレクトリが見つかりません: {data_root}")
        if not excel_path or excel_path.strip() == "":
            errors.append("❌ Excelラベルファイルパスは必須です - 「📄 参照」ボタンでファイルを選択してください")
        elif not os.path.exists(excel_path):
            errors.append(f"❌ Excelファイルが見つかりません: {excel_path}")

        if errors:
            for error in errors:
                st.error(error)
        else:
            # Save configuration to session state
            st.session_state.config_complete = True
            st.session_state.annotator = annotator.strip()
            st.session_state.data_root = data_root.strip()
            st.session_state.excel_path = excel_path.strip()
            st.session_state.csv_path = os.path.join(
                data_root.strip(),
                f"annotations_{annotator.strip()}.csv"
            )
            st.rerun()


def initialize_session_state():
    """設定完了後、セッション状態を初期化。"""
    if 'config_complete' not in st.session_state:
        st.session_state.config_complete = False

    if not st.session_state.config_complete:
        return

    # Load case IDs
    if 'case_ids' not in st.session_state:
        st.session_state.case_ids = get_all_case_ids(st.session_state.data_root)

    # Initialize current case index
    if 'current_case_idx' not in st.session_state:
        st.session_state.current_case_idx = 0

    # Load ground truth labels (not displayed, only for CSV saving)
    if 'ground_truth_labels' not in st.session_state:
        st.session_state.ground_truth_labels = load_ground_truth_labels(
            st.session_state.excel_path
        )

    # Load annotated cases
    if 'annotated_cases' not in st.session_state:
        st.session_state.annotated_cases = get_annotated_cases(
            st.session_state.csv_path,
            st.session_state.annotator
        )

    # Initialize zoom level (persistent across cases)
    if 'zoom_percent' not in st.session_state:
        st.session_state.zoom_percent = 80  # Default zoom level


def get_current_case_id() -> Optional[Union[int, float]]:
    """現在の症例IDを取得。"""
    if (st.session_state.current_case_idx >= 0 and
            st.session_state.current_case_idx < len(st.session_state.case_ids)):
        return st.session_state.case_ids[st.session_state.current_case_idx]
    return None


def render_sidebar_ui():
    """
    サイドバーに進捗、ナビゲーション、設定を集約。
    メイン画面の縦スペースを確保するために使用。
    """
    with st.sidebar:
        st.header(f"担当: {st.session_state.annotator}")
        
        # --- 進捗状況 ---
        total_cases = len(st.session_state.case_ids)
        annotated_count = len(st.session_state.annotated_cases)
        progress = annotated_count / total_cases if total_cases > 0 else 0
        st.progress(progress)
        st.caption(f"進捗: {progress:.1%} ({annotated_count}/{total_cases})")
        
        st.markdown("---")
        
        # --- 症例ナビゲーション ---
        st.subheader("症例選択")
        current_case_id = get_current_case_id()
        case_display = [
            f"症例 {cid}" + (" ✓" if cid in st.session_state.annotated_cases else "")
            for cid in st.session_state.case_ids
        ]

        # 症例選択のコールバック
        def on_case_change():
            pass
        
        selected_idx = st.selectbox(
            "症例リスト",
            range(len(st.session_state.case_ids)),
            index=st.session_state.current_case_idx,
            format_func=lambda i: case_display[i],
            label_visibility="collapsed"
        )
        
        if selected_idx != st.session_state.current_case_idx:
            st.session_state.current_case_idx = selected_idx
            st.rerun()

        # 前後の症例移動ボタンのコールバック
        def change_case(amount: int):
            new_idx = st.session_state.current_case_idx + amount
            if 0 <= new_idx < len(st.session_state.case_ids):
                st.session_state.current_case_idx = new_idx

        col1, col2 = st.columns(2)
        with col1:
            st.button(
                "← 前へ",
                disabled=st.session_state.current_case_idx == 0,
                use_container_width=True,
                on_click=change_case,
                args=(-1,)
            )
        with col2:
            st.button(
                "次へ →",
                disabled=st.session_state.current_case_idx >= len(st.session_state.case_ids) - 1,
                use_container_width=True,
                on_click=change_case,
                args=(1,)
            )
        
        st.markdown("---")

        # --- 画像拡大率 ---
        zoom_levels = [30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140, 150]
        if st.session_state.zoom_percent not in zoom_levels:
            st.session_state.zoom_percent = 80
        
        current_idx = zoom_levels.index(st.session_state.zoom_percent)
        
        col_z1, col_z2, col_z3 = st.columns([1, 2, 1])
        
        with col_z1:
            if st.button("-", key="zoom_out", use_container_width=True):
                if current_idx > 0:
                    st.session_state.zoom_percent = zoom_levels[current_idx - 1]
                    st.rerun()
                    
        with col_z2:
            st.markdown(f"<div style='text-align: center; line-height: 2.3;'><b>{st.session_state.zoom_percent} %</b></div>", unsafe_allow_html=True)
            
        with col_z3:
            if st.button("+", key="zoom_in", use_container_width=True):
                if current_idx < len(zoom_levels) - 1:
                    st.session_state.zoom_percent = zoom_levels[current_idx + 1]
                    st.rerun()


def render_image_viewer(case_id: Union[int, float], container):
    """
    スライダー付き画像ビューアーを表示。

    Args:
        case_id: 現在の症例ID
        container: Streamlit container to render in
    """
    images = get_case_images(st.session_state.data_root, case_id)

    if len(images) == 0:
        container.error(f"症例 {case_id} の画像が見つかりません")
        return

    # Initialize frame index in session state if not exists
    frame_key = f"frame_idx_{case_id}"
    if frame_key not in st.session_state:
        st.session_state[frame_key] = 0

    # ボタンが押されたときに実行される関数
    def change_frame(amount: int):
        current = st.session_state[frame_key]
        new_val = current + amount
        if 0 <= new_val < len(images):
            st.session_state[frame_key] = new_val
            # st.rerun()

    # Image slider
    frame_idx = container.slider(
        "フレーム番号",
        min_value=0,
        max_value=len(images) - 1,
        value=st.session_state[frame_key],
        key=frame_key,
        label_visibility="collapsed",
        # on_change=lambda: None  # Ensure state updates
    )

    # Update session state from slider
    # if frame_idx != st.session_state[frame_key]:
    #     st.session_state[frame_key] = frame_idx

    # Display image with custom width based on global zoom level
    try:
        img = Image.open(images[frame_idx])

        # Calculate display width based on zoom percentage from session state
        display_width = int(DEFAULT_IMAGE_WIDTH * st.session_state.zoom_percent / 100)

        container.image(
            img,
            caption=f"症例 {case_id} - フレーム {frame_idx + 1}/{len(images)}",
            width=display_width
        )

        # Arrow buttons for frame navigation (画像の右下に配置)
        col1, col2, col3 = container.columns([4, 1, 1])
        with col1:
            st.write("")  # Spacer
        with col2:
            st.button(
                "◀",
                key=f"prev_{case_id}",
                on_click=change_frame,
                args=(-1,),
            )
            # if st.button("◀", key=f"prev_{case_id}"):
            #     if frame_idx > 0:
            #         st.session_state[frame_key] = frame_idx - 1
            #         st.rerun()
        with col3:
            st.button(
                "▶",
                key=f"next_{case_id}",
                on_click=change_frame,
                args=(1,),
            )
            # if st.button("▶", key=f"next_{case_id}", disabled=frame_idx >= len(images) - 1):
            #     if frame_idx < len(images) - 1:
            #         st.session_state[frame_key] = frame_idx + 1
            #         st.rerun()

    except Exception as e:
        container.error(f"画像読み込みエラー: {e}")

    # Keyboard navigation support (キーボードの矢印キーでフレーム移動)
    container.caption("💡 ヒント: スライダーをクリックしてから矢印キー(←→)でフレーム移動、または◀▶ボタンをクリックできます")
    inject_keyboard_shortcuts()


def render_annotation_form(container, case_id: Union[int, float]) -> dict:
    """アノテーション入力フォームを表示 - コンパクト版"""
    
    # Q1: Prediction (横並び)
    container.markdown("**Q1: 合併症予測**")
    prediction = container.radio(
        "Q1: 合併症予測",
        PREDICTION_OPTIONS,
        key=f"prediction_{case_id}",
        horizontal=True,  # 変更: 横並びにして省スペース化
        label_visibility="collapsed"
    )

    # Q2: Confidence (横並び)
    container.markdown("**Q2: 確信度**")
    confidence = container.radio(
        "確信度",
        CONFIDENCE_OPTIONS,
        index=2,  # 50%
        format_func=lambda x: f"{x}%",
        key=f"confidence_{case_id}",
        label_visibility="collapsed",
        horizontal=True  # 変更: 横並び
    )

    # Q3: Reasons
    container.markdown("**Q3: 判断根拠**")
    reasons = []
    # チェックボックスは項目の長さによるため縦並びのまま
    for reason in REASONS_OPTIONS:
        if container.checkbox(reason, key=f"reason_{case_id}_{reason}"):
            reasons.append(reason)

    # Q4: Comment (高さを最小限に)
    container.markdown("**Q4: 自由記述**")
    comment = container.text_area(
        "Q4: 自由記述",
        value="",
        key=f"comment_{case_id}",
        height=120,
        label_visibility="collapsed"
    )

    return {
        'prediction': prediction,
        'confidence': confidence,
        'reasons': reasons,
        'comment': comment
    }


def validate_annotation(annotation_data: dict) -> tuple[bool, str]:
    """
    保存前にアノテーションデータを検証。

    Args:
        annotation_data: アノテーションフィールドの辞書

    Returns:
        (有効かどうか, エラーメッセージ) のタプル
    """
    if len(annotation_data['reasons']) == 0:
        return False, "判断根拠を最低1つ選択してください (Q3)"

    return True, ""


def save_and_advance(container, case_id: Union[int, float], annotation_data: dict):
    """
    アノテーションを保存して次の症例へ進む。

    Args:
        container: Streamlit container to display messages in
        case_id: 現在の症例ID
        annotation_data: アノテーションフィールドの辞書
    """
    is_valid, error_msg = validate_annotation(annotation_data)

    if not is_valid:
        container.error(error_msg)
        return

    try:
        # Get ground truth label for this case (may be None)
        ground_truth = st.session_state.ground_truth_labels.get(case_id, None)

        save_annotation(
            csv_path=st.session_state.csv_path,
            case_id=case_id,
            prediction=annotation_data['prediction'],
            confidence=annotation_data['confidence'],
            reasons=annotation_data['reasons'],
            comment=annotation_data['comment'],
            annotator=st.session_state.annotator,
            ground_truth=ground_truth
        )

        # Update annotated cases
        st.session_state.annotated_cases.add(case_id)

        # Show success message
        container.success(f"✓ 症例 {case_id} を保存しました!")

        # Auto-advance to next unannotated case
        next_idx = st.session_state.current_case_idx + 1
        while next_idx < len(st.session_state.case_ids):
            next_case_id = st.session_state.case_ids[next_idx]
            if next_case_id not in st.session_state.annotated_cases:
                st.session_state.current_case_idx = next_idx
                st.rerun()
                return
            next_idx += 1

        # If no more unannotated cases, stay on current or show completion
        if next_idx < len(st.session_state.case_ids):
            st.session_state.current_case_idx = next_idx
            st.rerun()
        else:
            container.info("全症例のアノテーションが完了しました! 🎉")

    except Exception as e:
        container.error(f"保存エラー: {e}")


def main():
    """メインアプリケーションエントリーポイント。"""
    initialize_session_state()

    # Show configuration form if not completed
    if not st.session_state.config_complete:
        render_configuration_form()
        return

    # Check if there are any cases
    if len(st.session_state.case_ids) == 0:
        st.error(f"{st.session_state.data_root} に症例が見つかりません")
        st.info("アプリケーションを再起動して、データディレクトリパスを確認してください。")
        return

    render_sidebar_ui()

    current_case_id = get_current_case_id()

    if current_case_id is None:
        st.error("利用可能な症例がありません")
        return

    # Validate case directory
    is_valid, error_msg = validate_case_directory(
        st.session_state.data_root,
        current_case_id
    )
    if not is_valid:
        st.error(error_msg)
        return

    # Main layout: Left - Image viewer, Right - Navigation + Annotation
    left_col, right_col = st.columns([3, 1])

    # Left side: Image viewer
    with left_col:
        render_image_viewer(current_case_id, left_col)

    # Right side: Navigation and Annotation
    with right_col:
        # Annotation form section
        annotation_data = render_annotation_form(right_col, current_case_id)

        # Save button
        if st.button("💾 保存して次へ", type="primary", use_container_width=True, key=f"save_{current_case_id}"):
            save_and_advance(right_col, current_case_id, annotation_data)


if __name__ == "__main__":
    main()