diff --git a/CLAUDE.md b/CLAUDE.md index 9325a4a..53bbea5 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -22,9 +22,10 @@ - `docs/03_TECH/TECH_03_デバッグオーバーレイ仕様.txt` — オーバーレイ表示項目、描画色、GUI 操作 - `docs/03_TECH/TECH_04_線検出精度向上方針.txt` — 線検出が最重要ファクターである理由、照明・影の課題、改善の方向性 - `docs/03_TECH/TECH_05_コースアウト復帰仕様.txt` — 復帰判定ロジック、復帰動作、パラメータ一覧、GUI 仕様 +- `docs/03_TECH/TECH_06_十字路分類モデル評価.txt` — モデル比較結果、採用モデル、F1 スコア(学習実行時に自動生成) ### 環境(セットアップ時に参照) -- `docs/04_ENV/ENV_01_技術スタック選定.txt` — ZMQ, PySide6, OpenCV, Picamera2, RPi.GPIO, python-dotenv +- `docs/04_ENV/ENV_01_技術スタック選定.txt` — ZMQ, PySide6, OpenCV, Picamera2, RPi.GPIO, python-dotenv, scikit-learn - `docs/04_ENV/ENV_02_PC環境構築手順.txt` — venv 作成、ライブラリインストール - `docs/04_ENV/ENV_03_RaspPi環境構築手順.txt` — SSH 接続、deploy.sh による転送、venv 構築、動作確認 - `docs/04_ENV/ENV_04_ディレクトリ構成.txt` — src/ の構成と実装状態 diff --git "a/docs/04_ENV/ENV_01_\346\212\200\350\241\223\343\202\271\343\202\277\343\203\203\343\202\257\351\201\270\345\256\232.txt" "b/docs/04_ENV/ENV_01_\346\212\200\350\241\223\343\202\271\343\202\277\343\203\203\343\202\257\351\201\270\345\256\232.txt" index 924416d..91f95fe 100644 --- "a/docs/04_ENV/ENV_01_\346\212\200\350\241\223\343\202\271\343\202\277\343\203\203\343\202\257\351\201\270\345\256\232.txt" +++ "b/docs/04_ENV/ENV_01_\346\212\200\350\241\223\343\202\271\343\202\277\343\203\203\343\202\257\351\201\270\345\256\232.txt" @@ -91,7 +91,23 @@ ・PWM 制御・GPIO 出力に必要な機能が揃っている. - 2-6. 環境変数管理: python-dotenv + 2-6. 機械学習: scikit-learn + + ・ライブラリ: scikit-learn, joblib + ・用途: PC 側での十字路分類モデルの学習・推論 + - 複数モデルの交差検証による比較評価 + - 最良モデルの保存・読み込み + + ■ 選定理由 + ・40×30 二値画像の 2 クラス分類であり,軽量なモデルで十分である. + ・SVM,ロジスティック回帰,MLP 等を統一的な API で比較できる. + ・NumPy ベースで既存コードとの統合が容易である. + + ■ 不採用とした候補 + ・PyTorch/TensorFlow: 入力が 1200 特徴量と小さく, + 深層学習フレームワークは過剰である. + + 2-7. 環境変数管理: python-dotenv ・用途: .env ファイルから環境変数を読み込む - PC の IP アドレス @@ -117,5 +133,6 @@ ・PySide6: GUI アプリケーション ・OpenCV: 画像処理・線検出 + ・scikit-learn: 十字路分類モデル ・pyzmq: Pi との通信 ・python-dotenv: 環境変数管理 diff --git "a/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" "b/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" index 38babc7..9026ee0 100644 --- "a/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" +++ "b/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" @@ -55,9 +55,11 @@ │ └── main_window.py メインウィンドウ ├── comm/ 通信関連 │ └── zmq_client.py ZMQ 送受信 - ├── data/ 学習データ収集・仕分け + ├── data/ 学習データ収集・仕分け・学習 │ ├── collector.py 二値画像のラベル付き保存 - │ └── reviewer.py 仕分けレビュー GUI + │ ├── reviewer.py 仕分けレビュー GUI + │ ├── dataset.py データ読み込み + │ └── train.py モデル学習・評価・保存 ├── steering/ 操舵量計算(独立モジュール) │ ├── base.py 共通インターフェース │ ├── pd_control.py PD 制御の実装 diff --git a/requirements_pc.txt b/requirements_pc.txt index 89bfd92..ec15433 100644 --- a/requirements_pc.txt +++ b/requirements_pc.txt @@ -3,4 +3,6 @@ pyzmq==27.1.0 numpy==2.4.3 python-dotenv==1.2.2 +scikit-learn==1.6.1 +joblib==1.4.2 pytest==9.0.2 diff --git a/src/pc/data/__main__.py b/src/pc/data/__main__.py new file mode 100644 index 0000000..915bc7e --- /dev/null +++ b/src/pc/data/__main__.py @@ -0,0 +1,5 @@ +"""pc.data パッケージの直接実行で学習スクリプトを起動する""" + +from pc.data.train import main + +main() diff --git a/src/pc/data/dataset.py b/src/pc/data/dataset.py new file mode 100644 index 0000000..1e1980c --- /dev/null +++ b/src/pc/data/dataset.py @@ -0,0 +1,66 @@ +""" +dataset +confirmed/ から学習データを読み込むモジュール + +画像を flatten した特徴量ベクトルとラベルを返す +""" + +from pathlib import Path + +import cv2 +import numpy as np + +from pc.data.collector import ( + CONFIRMED_DIR, + LABEL_INTERSECTION, + LABEL_NORMAL, +) + + +def load_dataset( + confirmed_dir: Path = CONFIRMED_DIR, +) -> tuple[np.ndarray, np.ndarray]: + """confirmed/ から画像とラベルを読み込む + + Args: + confirmed_dir: 確定済みデータのディレクトリ + + Returns: + (X, y) のタプル + X: (n_samples, 1200) の特徴量行列(0.0/1.0) + y: (n_samples,) のラベル配列(1=intersection, 0=normal) + + Raises: + FileNotFoundError: 画像が見つからない場合 + """ + images: list[np.ndarray] = [] + labels: list[int] = [] + + for label_name, label_val in ( + (LABEL_INTERSECTION, 1), + (LABEL_NORMAL, 0), + ): + label_dir = confirmed_dir / label_name + if not label_dir.is_dir(): + continue + for img_path in sorted(label_dir.glob("*.png")): + img = cv2.imread( + str(img_path), cv2.IMREAD_GRAYSCALE, + ) + if img is None: + continue + # 0/255 → 0.0/1.0 に正規化して flatten + flat = (img.flatten() / 255.0).astype( + np.float32, + ) + images.append(flat) + labels.append(label_val) + + if len(images) == 0: + raise FileNotFoundError( + f"画像が見つかりません: {confirmed_dir}" + ) + + x = np.array(images) + y = np.array(labels) + return x, y diff --git a/src/pc/data/train.py b/src/pc/data/train.py new file mode 100644 index 0000000..1ac811d --- /dev/null +++ b/src/pc/data/train.py @@ -0,0 +1,393 @@ +""" +train +十字路分類モデルの学習・評価スクリプト + +複数モデルを 5-fold CV で比較し,結果をドキュメントに出力する +最良モデルを全データで再学習して params/ に保存する + +使い方: + $ cd src && python -m pc.data.train +""" + +import sys +from datetime import datetime +from pathlib import Path + +import joblib +import numpy as np +from sklearn.ensemble import RandomForestClassifier +from sklearn.linear_model import LogisticRegression +from sklearn.metrics import ( + classification_report, + f1_score, +) +from sklearn.model_selection import StratifiedKFold +from sklearn.neural_network import MLPClassifier +from sklearn.preprocessing import StandardScaler +from sklearn.svm import SVC + +from common.json_utils import PARAMS_DIR +from pc.data.dataset import load_dataset + +# ── 定数 ────────────────────────────────────────────── + +N_FOLDS: int = 5 +RANDOM_STATE: int = 42 + +# モデル保存先 +MODEL_PATH: Path = PARAMS_DIR / "intersection_model.pkl" +SCALER_PATH: Path = PARAMS_DIR / "intersection_scaler.pkl" + +# ドキュメント出力先 +_PROJECT_ROOT: Path = ( + Path(__file__).resolve().parent.parent.parent.parent +) +REPORT_PATH: Path = ( + _PROJECT_ROOT / "docs" / "03_TECH" + / "TECH_06_十字路分類モデル評価.txt" +) + + +# ── モデル定義 ──────────────────────────────────────── + +def _build_models() -> list[tuple[str, object]]: + """比較対象のモデル一覧を返す + + Returns: + (名前, モデルインスタンス) のリスト + """ + return [ + ( + "LogisticRegression", + LogisticRegression( + max_iter=1000, + random_state=RANDOM_STATE, + ), + ), + ( + "SVM_RBF", + SVC( + kernel="rbf", + random_state=RANDOM_STATE, + ), + ), + ( + "SVM_Linear", + SVC( + kernel="linear", + random_state=RANDOM_STATE, + ), + ), + ( + "RandomForest", + RandomForestClassifier( + n_estimators=100, + random_state=RANDOM_STATE, + ), + ), + ( + "MLP_1layer", + MLPClassifier( + hidden_layer_sizes=(64,), + max_iter=500, + random_state=RANDOM_STATE, + ), + ), + ( + "MLP_2layer", + MLPClassifier( + hidden_layer_sizes=(128, 64), + max_iter=500, + random_state=RANDOM_STATE, + ), + ), + ] + + +# ── 評価 ────────────────────────────────────────────── + +def _evaluate_models( + x: np.ndarray, + y: np.ndarray, +) -> list[dict]: + """全モデルを StratifiedKFold CV で評価する + + Args: + x: 特徴量行列 + y: ラベル配列 + + Returns: + 各モデルの評価結果辞書のリスト + """ + skf = StratifiedKFold( + n_splits=N_FOLDS, + shuffle=True, + random_state=RANDOM_STATE, + ) + results: list[dict] = [] + + for name, model in _build_models(): + f1_scores: list[float] = [] + + for train_idx, test_idx in skf.split(x, y): + x_train, x_test = x[train_idx], x[test_idx] + y_train, y_test = y[train_idx], y[test_idx] + + # スケーリング + scaler = StandardScaler() + x_train_s = scaler.fit_transform(x_train) + x_test_s = scaler.transform(x_test) + + model_clone = _clone_model(name) + model_clone.fit(x_train_s, y_train) + y_pred = model_clone.predict(x_test_s) + + f1 = f1_score(y_test, y_pred, average="macro") + f1_scores.append(f1) + + results.append({ + "name": name, + "f1_mean": float(np.mean(f1_scores)), + "f1_std": float(np.std(f1_scores)), + "f1_scores": f1_scores, + }) + print( + f" {name:25s} " + f"F1={np.mean(f1_scores):.4f} " + f"(±{np.std(f1_scores):.4f})" + ) + + return results + + +def _clone_model(name: str) -> object: + """モデル名から新しいインスタンスを生成する + + Args: + name: モデル名 + + Returns: + モデルインスタンス + """ + for n, m in _build_models(): + if n == name: + return m + raise ValueError(f"不明なモデル: {name}") + + +# ── 最良モデルの再学習・保存 ────────────────────────── + +def _train_best_and_save( + best_name: str, + x: np.ndarray, + y: np.ndarray, +) -> str: + """最良モデルを全データで再学習して保存する + + Args: + best_name: 最良モデルの名前 + x: 全特徴量行列 + y: 全ラベル配列 + + Returns: + 全データでの classification_report 文字列 + """ + scaler = StandardScaler() + x_scaled = scaler.fit_transform(x) + + model = _clone_model(best_name) + model.fit(x_scaled, y) + y_pred = model.predict(x_scaled) + report = classification_report( + y, y_pred, + target_names=["normal", "intersection"], + ) + + # 保存 + PARAMS_DIR.mkdir(parents=True, exist_ok=True) + joblib.dump(model, MODEL_PATH) + joblib.dump(scaler, SCALER_PATH) + + return report + + +# ── ドキュメント出力 ────────────────────────────────── + +def _write_report( + results: list[dict], + best_name: str, + n_samples: int, + n_intersection: int, + n_normal: int, + full_report: str, +) -> None: + """評価結果をドキュメントとして出力する + + Args: + results: 各モデルの評価結果 + best_name: 最良モデルの名前 + n_samples: 全サンプル数 + n_intersection: intersection のサンプル数 + n_normal: normal のサンプル数 + full_report: 全データでの classification_report + """ + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M") + + # 結果テーブル + sorted_results = sorted( + results, key=lambda r: r["f1_mean"], reverse=True, + ) + table_lines: list[str] = [] + table_lines.append( + f" {'モデル':25s} {'F1(平均)':>10s}" + f" {'F1(標準偏差)':>14s}" + ) + table_lines.append(f" {'─' * 55}") + for r in sorted_results: + marker = " ← 採用" if r["name"] == best_name else "" + table_lines.append( + f" {r['name']:25s}" + f" {r['f1_mean']:10.4f}" + f" {r['f1_std']:14.4f}{marker}" + ) + table_str = "\n".join(table_lines) + + # fold 詳細 + fold_lines: list[str] = [] + for r in sorted_results: + scores_str = ", ".join( + f"{s:.4f}" for s in r["f1_scores"] + ) + fold_lines.append( + f" ・{r['name']}: [{scores_str}]" + ) + fold_str = "\n".join(fold_lines) + + # classification_report のインデント + report_indented = "\n".join( + f" {line}" for line in full_report.splitlines() + ) + + content = f"""\ +======================================================================== +十字路分類モデル評価 (Intersection Classifier Evaluation) +======================================================================== + + +1. 概要 (Overview) +------------------------------------------------------------------------ + + 1-0. 目的 + + 十字路(intersection)と通常区間(normal)を分類する + 二値画像分類モデルの比較評価結果を記録する. + + 1-1. 評価日時 + + ・実施日時: {timestamp} + + 1-2. データセット + + ・入力: 40×30 二値画像(1200 特徴量,0.0/1.0) + ・全サンプル数: {n_samples} + - intersection: {n_intersection} + - normal: {n_normal} + ・クラス比率: intersection:normal\ + = {n_intersection}:{n_normal} + + +2. 評価方法 (Evaluation Method) +------------------------------------------------------------------------ + + 2-1. 交差検証 + + ・手法: Stratified {N_FOLDS}-Fold Cross-Validation + ・指標: マクロ平均 F1 スコア + ・前処理: StandardScaler(fold ごとに fit) + ・乱数シード: {RANDOM_STATE} + + +3. 評価結果 (Results) +------------------------------------------------------------------------ + + 3-1. モデル比較(F1 スコア降順) + +{table_str} + + 3-2. 各 Fold の F1 スコア + +{fold_str} + + 3-3. 採用モデル + + ・モデル: {best_name} + ・保存先: params/intersection_model.pkl + ・スケーラ: params/intersection_scaler.pkl + + 3-4. 全データでの分類レポート(再学習後) + +{report_indented} +""" + + REPORT_PATH.parent.mkdir(parents=True, exist_ok=True) + with open( + REPORT_PATH, "w", encoding="utf-8", + ) as f: + f.write(content) + + print(f"\nドキュメント出力: {REPORT_PATH}") + + +# ── メイン ──────────────────────────────────────────── + +def main() -> None: + """学習・評価のメインフロー""" + print("=== 十字路分類モデルの学習・評価 ===\n") + + # データ読み込み + print("データ読み込み中...") + try: + x, y = load_dataset() + except FileNotFoundError as e: + print(f"エラー: {e}") + sys.exit(1) + + n_samples = len(y) + n_intersection = int(np.sum(y == 1)) + n_normal = int(np.sum(y == 0)) + print( + f" サンプル数: {n_samples}" + f" (intersection={n_intersection}," + f" normal={n_normal})\n" + ) + + # CV 評価 + print(f"{N_FOLDS}-Fold CV 評価中...") + results = _evaluate_models(x, y) + + # 最良モデル選定 + best = max(results, key=lambda r: r["f1_mean"]) + best_name = best["name"] + print( + f"\n最良モデル: {best_name}" + f" (F1={best['f1_mean']:.4f})\n" + ) + + # 全データで再学習・保存 + print("全データで再学習・保存中...") + full_report = _train_best_and_save(best_name, x, y) + print(f" モデル保存: {MODEL_PATH}") + print(f" スケーラ保存: {SCALER_PATH}") + + # ドキュメント出力 + _write_report( + results, best_name, + n_samples, n_intersection, n_normal, + full_report, + ) + + print("\n完了") + + +if __name__ == "__main__": + main()