"""
IVUS Complication Annotation Tool
Streamlit-based application for annotating IVUS images with complication predictions.
"""
import streamlit as st
import streamlit.components.v1 as components
from PIL import Image
import os
from typing import Optional, Union
# Tkinter import for folder/file dialog (Windows native dialog)
try:
import tkinter as tk
from tkinter import filedialog
TKINTER_AVAILABLE = True
except ImportError:
TKINTER_AVAILABLE = False
from utils import (
get_all_case_ids,
get_case_images,
load_ground_truth_labels,
validate_case_directory,
save_annotation,
get_annotated_cases
)
# UI Constants
PREDICTION_OPTIONS = ["あり", "なし"] # 「あり」を上に
REASONS_OPTIONS = [
"石灰化プラークが多い",
"石灰化プラークが少ない",
"減衰プラークが多い",
"減衰プラークが少ない",
"その他(記述)"
]
MIN_CONFIDENCE = 0
MAX_CONFIDENCE = 100
DEFAULT_CONFIDENCE = 50
CONFIDENCE_OPTIONS = [0, 25, 50, 75, 100] # 5段階の確信度
# Image display settings
DEFAULT_IMAGE_WIDTH = 600 # Default image width in pixels (デフォルトより少し縮小)
# Page configuration
st.set_page_config(
page_title="IVUS合併症アノテーションツール",
page_icon="🫀",
layout="wide",
initial_sidebar_state="expanded" # Hide sidebar to prevent scrolling
)
def inject_keyboard_shortcuts():
"""
JavaScriptを入れて,キーボードの左右矢印キーで画面上の「◀」「▶」ボタンをプログラム的にクリックさせる
"""
js_code = """
<script>
(function() {
const doc = window.parent.document;
// すでにリスナーが登録されている場合は何もしない(重複防止)
if (window.parent._hasIVUSShortcuts) {
return;
}
window.parent._hasIVUSShortcuts = true;
doc.addEventListener('keydown', function(e) {
const active = doc.activeElement;
// 【重要】テキスト入力中のみ除外するロジックを修正
// TEXTAREA は無条件で除外(コメント入力など)
// INPUT は type="range" (スライダー) 以外の場合のみ除外(名前入力など)
if (active.tagName === 'TEXTAREA') {
return;
}
if (active.tagName === 'INPUT' && active.type !== 'range') {
return;
}
const buttons = Array.from(doc.querySelectorAll('button'));
if (e.key === 'ArrowLeft') {
const prevBtn = buttons.find(b => b.innerText.includes('◀'));
if (prevBtn) {
// ブラウザのデフォルト動作(スライダー移動など)をキャンセル
e.preventDefault();
e.stopPropagation();
prevBtn.click();
}
} else if (e.key === 'ArrowRight') {
const nextBtn = buttons.find(b => b.innerText.includes('▶'));
if (nextBtn) {
e.preventDefault();
e.stopPropagation();
nextBtn.click();
}
}
});
})();
</script>
"""
components.html(js_code, height=0)
def open_folder_dialog(initial_dir: str = ".") -> str:
"""
Windows標準のフォルダ選択ダイアログを開き、選択されたパスを返す。
Args:
initial_dir: 初期表示ディレクトリ
Returns:
選択されたフォルダパス(キャンセル時は空文字列)
Note:
WSL環境では動作しない可能性があります。Windows環境での使用を想定。
"""
if not TKINTER_AVAILABLE:
return ""
try:
root = tk.Tk()
root.withdraw() # メインウィンドウを隠す
root.wm_attributes('-topmost', 1) # ダイアログを最前面に表示
folder_path = filedialog.askdirectory(initialdir=initial_dir)
root.destroy()
return folder_path if folder_path else ""
except Exception as e:
print(f"フォルダ選択ダイアログエラー: {e}")
return ""
def open_file_dialog(initial_dir: str = ".", file_types=None) -> str:
"""
Windows標準のファイル選択ダイアログを開き、選択されたパスを返す。
Args:
initial_dir: 初期表示ディレクトリ
file_types: ファイル種類のフィルタ
Returns:
選択されたファイルパス(キャンセル時は空文字列)
Note:
WSL環境では動作しない可能性があります。Windows環境での使用を想定。
"""
if not TKINTER_AVAILABLE:
return ""
if file_types is None:
file_types = [("Excel files", "*.xlsx;*.xls"), ("All files", "*.*")]
try:
root = tk.Tk()
root.withdraw()
root.wm_attributes('-topmost', 1)
file_path = filedialog.askopenfilename(
initialdir=initial_dir,
filetypes=file_types
)
root.destroy()
return file_path if file_path else ""
except Exception as e:
print(f"ファイル選択ダイアログエラー: {e}")
return ""
def render_configuration_form():
"""
起動時の設定フォームを表示。
アノテーター名とデータパスを入力してから進む。
フォルダ/ファイル選択ボタンも提供。
"""
st.title("IVUS合併症アノテーションツール - 設定")
st.markdown("---")
# Initialize session state for paths if not exists (空の初期値)
if 'temp_data_root' not in st.session_state:
st.session_state.temp_data_root = ""
if 'temp_excel_path' not in st.session_state:
st.session_state.temp_excel_path = ""
st.subheader("設定")
# Annotator name input
annotator = st.text_input(
"アノテーター名 *",
value="",
help="あなたの名前を入力してください(必須)",
placeholder="例: 田中"
)
st.markdown("---")
# Data Directory Path with folder picker
st.markdown("**データディレクトリパス** *")
st.caption("症例フォルダ(CHIBAMI_xxx_pre)が含まれるディレクトリを選択してください")
col1, col2 = st.columns([4, 1])
with col1:
data_root = st.text_input(
"データディレクトリパス",
value=st.session_state.temp_data_root,
help="「📁 参照」ボタンでフォルダを選択してください",
placeholder="「📁 参照」ボタンでフォルダを選択",
label_visibility="collapsed"
)
# Update session state when user types
if data_root != st.session_state.temp_data_root:
st.session_state.temp_data_root = data_root
with col2:
if st.button("📁 参照", key="btn_browse_data", use_container_width=True):
if TKINTER_AVAILABLE:
# Use current path as initial dir if exists, otherwise use home directory
initial = st.session_state.temp_data_root if st.session_state.temp_data_root else os.path.expanduser("~")
selected_path = open_folder_dialog(initial_dir=initial)
if selected_path:
st.session_state.temp_data_root = selected_path
st.rerun()
else:
st.warning("フォルダ選択機能はWindows環境でのみ利用可能です")
# Show current path validation
if data_root and os.path.exists(data_root):
st.success(f"✓ フォルダが見つかりました: {data_root}")
elif data_root:
st.error(f"✗ フォルダが見つかりません: {data_root}")
st.markdown("---")
# Excel Label Path with file picker
st.markdown("**Excelラベルファイルパス** *")
st.caption("正解ラベルが記載されたExcelファイル(CHIBAMI_case_list_xxx.xlsx)を選択してください")
col1, col2 = st.columns([4, 1])
with col1:
excel_path = st.text_input(
"Excelラベルファイルパス",
value=st.session_state.temp_excel_path,
help="「📄 参照」ボタンでExcelファイルを選択してください",
placeholder="「📄 参照」ボタンでファイルを選択",
label_visibility="collapsed"
)
# Update session state when user types
if excel_path != st.session_state.temp_excel_path:
st.session_state.temp_excel_path = excel_path
with col2:
if st.button("📄 参照", key="btn_browse_excel", use_container_width=True):
if TKINTER_AVAILABLE:
# Use directory of current path as initial dir if exists
initial = os.path.dirname(st.session_state.temp_excel_path) if st.session_state.temp_excel_path else os.path.expanduser("~")
selected_path = open_file_dialog(initial_dir=initial)
if selected_path:
st.session_state.temp_excel_path = selected_path
st.rerun()
else:
st.warning("ファイル選択機能はWindows環境でのみ利用可能です")
# Show current file validation
if excel_path and os.path.exists(excel_path):
st.success(f"✓ ファイルが見つかりました: {excel_path}")
elif excel_path:
st.error(f"✗ ファイルが見つかりません: {excel_path}")
st.markdown("---")
# Submit button
if st.button("アノテーション開始", type="primary", use_container_width=True):
# Validation
errors = []
if not annotator or annotator.strip() == "":
errors.append("❌ アノテーター名は必須です")
if not data_root or data_root.strip() == "":
errors.append("❌ データディレクトリパスは必須です - 「📁 参照」ボタンでフォルダを選択してください")
elif not os.path.exists(data_root):
errors.append(f"❌ データディレクトリが見つかりません: {data_root}")
if not excel_path or excel_path.strip() == "":
errors.append("❌ Excelラベルファイルパスは必須です - 「📄 参照」ボタンでファイルを選択してください")
elif not os.path.exists(excel_path):
errors.append(f"❌ Excelファイルが見つかりません: {excel_path}")
if errors:
for error in errors:
st.error(error)
else:
# Save configuration to session state
st.session_state.config_complete = True
st.session_state.annotator = annotator.strip()
st.session_state.data_root = data_root.strip()
st.session_state.excel_path = excel_path.strip()
st.session_state.csv_path = os.path.join(
data_root.strip(),
f"annotations_{annotator.strip()}.csv"
)
st.rerun()
def initialize_session_state():
"""設定完了後、セッション状態を初期化。"""
if 'config_complete' not in st.session_state:
st.session_state.config_complete = False
if not st.session_state.config_complete:
return
# Load case IDs
if 'case_ids' not in st.session_state:
st.session_state.case_ids = get_all_case_ids(st.session_state.data_root)
# Initialize current case index
if 'current_case_idx' not in st.session_state:
st.session_state.current_case_idx = 0
# Load ground truth labels (not displayed, only for CSV saving)
if 'ground_truth_labels' not in st.session_state:
st.session_state.ground_truth_labels = load_ground_truth_labels(
st.session_state.excel_path
)
# Load annotated cases
if 'annotated_cases' not in st.session_state:
st.session_state.annotated_cases = get_annotated_cases(
st.session_state.csv_path,
st.session_state.annotator
)
# Initialize zoom level (persistent across cases)
if 'zoom_percent' not in st.session_state:
st.session_state.zoom_percent = 80 # Default zoom level
def get_current_case_id() -> Optional[Union[int, float]]:
"""現在の症例IDを取得。"""
if (st.session_state.current_case_idx >= 0 and
st.session_state.current_case_idx < len(st.session_state.case_ids)):
return st.session_state.case_ids[st.session_state.current_case_idx]
return None
def render_sidebar_ui():
"""
サイドバーに進捗、ナビゲーション、設定を集約。
メイン画面の縦スペースを確保するために使用。
"""
with st.sidebar:
st.header(f"担当: {st.session_state.annotator}")
# --- 進捗状況 ---
total_cases = len(st.session_state.case_ids)
annotated_count = len(st.session_state.annotated_cases)
progress = annotated_count / total_cases if total_cases > 0 else 0
st.progress(progress)
st.caption(f"進捗: {progress:.1%} ({annotated_count}/{total_cases})")
st.markdown("---")
# --- 症例ナビゲーション ---
st.subheader("症例選択")
current_case_id = get_current_case_id()
case_display = [
f"症例 {cid}" + (" ✓" if cid in st.session_state.annotated_cases else "")
for cid in st.session_state.case_ids
]
selected_idx = st.selectbox(
"症例リスト",
range(len(st.session_state.case_ids)),
index=st.session_state.current_case_idx,
format_func=lambda i: case_display[i],
label_visibility="collapsed"
)
if selected_idx != st.session_state.current_case_idx:
st.session_state.current_case_idx = selected_idx
st.rerun()
# 前後の症例移動ボタンのコールバック
def change_case(amount: int):
new_idx = st.session_state.current_case_idx + amount
if 0 <= new_idx < len(st.session_state.case_ids):
st.session_state.current_case_idx = new_idx
col1, col2 = st.columns(2)
with col1:
st.button(
"← 前へ",
disabled=st.session_state.current_case_idx == 0,
use_container_width=True,
on_click=change_case,
args=(-1,)
)
with col2:
st.button(
"次へ →",
disabled=st.session_state.current_case_idx >= len(st.session_state.case_ids) - 1,
use_container_width=True,
on_click=change_case,
args=(1,)
)
st.markdown("---")
# --- 画像拡大率 ---
zoom_levels = [30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140, 150]
if st.session_state.zoom_percent not in zoom_levels:
st.session_state.zoom_percent = 80
current_zoom_idx = zoom_levels.index(st.session_state.zoom_percent)
# Zoom変更用コールバック関数
def change_zoom(amount: int):
new_idx = current_zoom_idx + amount
if 0 <= new_idx < len(zoom_levels):
st.session_state.zoom_percent = zoom_levels[new_idx]
col_z1, col_z2, col_z3 = st.columns([1, 2, 1])
with col_z1:
st.button(
"-",
key="zoom_out",
use_container_width=True,
on_click=change_zoom,
args=(-1,)
)
with col_z2:
st.markdown(f"<div style='text-align: center; line-height: 2.3;'><b>{st.session_state.zoom_percent} %</b></div>", unsafe_allow_html=True)
with col_z3:
st.button(
"+",
key="zoom_in",
use_container_width=True,
on_click=change_zoom,
args=(1,)
)
def render_image_viewer(case_id: Union[int, float], container):
"""
スライダー付き画像ビューアーを表示。
Args:
case_id: 現在の症例ID
container: Streamlit container to render in
"""
images = get_case_images(st.session_state.data_root, case_id)
if len(images) == 0:
container.error(f"症例 {case_id} の画像が見つかりません")
return
# Initialize frame index in session state if not exists
frame_key = f"frame_idx_{case_id}"
if frame_key not in st.session_state:
st.session_state[frame_key] = 0
# ボタンが押されたときに実行される関数
def change_frame(amount: int):
current = st.session_state[frame_key]
new_val = current + amount
if 0 <= new_val < len(images):
st.session_state[frame_key] = new_val
# st.rerun()
# Image slider
frame_idx = container.slider(
"フレーム番号",
min_value=0,
max_value=len(images) - 1,
value=st.session_state[frame_key],
key=frame_key,
label_visibility="collapsed",
# on_change=lambda: None # Ensure state updates
)
# Display image with custom width based on global zoom level
try:
img = Image.open(images[frame_idx])
# Calculate display width based on zoom percentage from session state
display_width = int(DEFAULT_IMAGE_WIDTH * st.session_state.zoom_percent / 100)
container.image(
img,
caption=f"症例 {case_id} - フレーム {frame_idx + 1}/{len(images)}",
width=display_width
)
# Arrow buttons for frame navigation (画像の右下に配置)
col1, col2, col3 = container.columns([4, 0.5, 0.5], gap="small")
with col1:
st.write("") # Spacer
with col2:
st.button(
"◀",
key=f"prev_{case_id}",
on_click=change_frame,
args=(-1,),
)
with col3:
st.button(
"▶",
key=f"next_{case_id}",
on_click=change_frame,
args=(1,),
)
except Exception as e:
container.error(f"画像読み込みエラー: {e}")
# Keyboard navigation support (キーボードの矢印キーでフレーム移動)
container.caption("💡 ヒント: スライダーをクリックしてから矢印キー(←→)でフレーム移動、または◀▶ボタンをクリックできます")
inject_keyboard_shortcuts()
def render_annotation_form(container, case_id: Union[int, float]) -> dict:
"""アノテーション入力フォームを表示 - コンパクト版"""
# Q1: Prediction (横並び)
container.markdown("**Q1: 合併症予測**")
prediction = container.radio(
"Q1: 合併症予測",
PREDICTION_OPTIONS,
key=f"prediction_{case_id}",
horizontal=True, # 変更: 横並びにして省スペース化
label_visibility="collapsed",
index=1 # デフォルトは「なし」
)
# Q2: Confidence (横並び)
container.markdown("**Q2: 確信度**")
confidence = container.radio(
"確信度",
CONFIDENCE_OPTIONS,
index=3, # 50%
format_func=lambda x: f"{x}%",
key=f"confidence_{case_id}",
label_visibility="collapsed",
horizontal=True # 変更: 横並び
)
# Q3: Reasons
container.markdown("**Q3: 判断根拠**")
reasons = []
# チェックボックスは項目の長さによるため縦並びのまま
for reason in REASONS_OPTIONS:
if container.checkbox(reason, key=f"reason_{case_id}_{reason}"):
reasons.append(reason)
# Q4: Comment (高さを最小限に)
container.markdown("**Q4: 自由記述**")
comment = container.text_area(
"Q4: 自由記述",
value="",
key=f"comment_{case_id}",
height=120,
label_visibility="collapsed"
)
return {
'prediction': prediction,
'confidence': confidence,
'reasons': reasons,
'comment': comment
}
def render_complete_screen():
"""全てのアノテーションが完了した際に表示する終了画面"""
st.balloons() # 完了のお祝いエフェクト
st.markdown("""
<div style='text-align: center; padding: 50px;'>
<h1>🎉 アノテーション完了 🎉</h1>
<h3>全症例の判定が終了しました。ご協力ありがとうございました!</h3>
</div>
""", unsafe_allow_html=True)
st.success(f"データは以下の場所に保存されました:")
st.code(st.session_state.csv_path, language=None)
st.info("ブラウザのタブを閉じて終了してください。")
def validate_annotation(annotation_data: dict) -> tuple[bool, str]:
"""
保存前にアノテーションデータを検証。
Args:
annotation_data: アノテーションフィールドの辞書
Returns:
(有効かどうか, エラーメッセージ) のタプル
"""
if len(annotation_data['reasons']) == 0:
return False, "判断根拠を最低1つ選択してください (Q3)"
return True, ""
def save_and_advance(container, case_id: Union[int, float], annotation_data: dict):
"""
アノテーションを保存して次の症例へ進む。
Args:
container: Streamlit container to display messages in
case_id: 現在の症例ID
annotation_data: アノテーションフィールドの辞書
"""
is_valid, error_msg = validate_annotation(annotation_data)
if not is_valid:
container.error(error_msg)
return
try:
# Get ground truth label for this case (may be None)
ground_truth = st.session_state.ground_truth_labels.get(case_id, None)
# 見つからず,CaseIDに小数点を含む場合
if ground_truth is None:
try:
int_case_id = int(case_id)
ground_truth = st.session_state.ground_truth_labels.get(int_case_id, None)
except:
pass
save_annotation(
csv_path=st.session_state.csv_path,
case_id=case_id,
prediction=annotation_data['prediction'],
confidence=annotation_data['confidence'],
reasons=annotation_data['reasons'],
comment=annotation_data['comment'],
annotator=st.session_state.annotator,
ground_truth=ground_truth
)
# Update annotated cases
st.session_state.annotated_cases.add(case_id)
# Show success message
container.success(f"✓ 症例 {case_id} を保存しました!")
# Auto-advance to next unannotated case
next_idx = st.session_state.current_case_idx + 1
while next_idx < len(st.session_state.case_ids):
next_case_id = st.session_state.case_ids[next_idx]
if next_case_id not in st.session_state.annotated_cases:
st.session_state.current_case_idx = next_idx
st.rerun()
return
next_idx += 1
# If no more unannotated cases, stay on current or show completion
if next_idx < len(st.session_state.case_ids):
st.session_state.current_case_idx = next_idx
st.rerun()
else:
st.rerun()
except Exception as e:
container.error(f"保存エラー: {e}")
def main():
"""メインアプリケーションエントリーポイント。"""
initialize_session_state()
# Show configuration form if not completed
if not st.session_state.config_complete:
render_configuration_form()
return
# Check if there are any cases
if len(st.session_state.case_ids) == 0:
st.error(f"{st.session_state.data_root} に症例が見つかりません")
st.info("アプリケーションを再起動して、データディレクトリパスを確認してください。")
return
render_sidebar_ui()
# 完了判定ロジック
total_cases = len(st.session_state.case_ids)
annotated_count = len(st.session_state.annotated_cases)
if total_cases > 0 and annotated_count >= total_cases:
render_complete_screen()
return
current_case_id = get_current_case_id()
if current_case_id is None:
st.error("利用可能な症例がありません")
return
# Validate case directory
is_valid, error_msg = validate_case_directory(
st.session_state.data_root,
current_case_id
)
if not is_valid:
st.error(error_msg)
return
# Main layout: Left - Image viewer, Right - Navigation + Annotation
left_col, right_col = st.columns([3, 1])
# Left side: Image viewer
with left_col:
render_image_viewer(current_case_id, left_col)
# Right side: Navigation and Annotation
with right_col:
# Annotation form section
annotation_data = render_annotation_form(right_col, current_case_id)
# Save button
if st.button("💾 保存して次へ", type="primary", use_container_width=True, key=f"save_{current_case_id}"):
save_and_advance(right_col, current_case_id, annotation_data)
if __name__ == "__main__":
main()