Newer
Older
StressAnalysis / analysis.py
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.ensemble import AdaBoostClassifier, RandomForestClassifier
from sklearn.metrics import (
    accuracy_score,
    confusion_matrix,
    f1_score,
    precision_score,
    recall_score,
    roc_auc_score,
)
from sklearn.model_selection import GridSearchCV, KFold, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC

pca_ccr = 0.98  # 主成分分析の累積寄与率の閾値
n_folds = 5  # クロスバリデーションの分割数
df = pd.DataFrame(
    columns=[
        "DataType",
        "Input",
        "Target",
        "Model",
        "Accuracy",
        "Recall",
        "Precision",
        "F1",
        "AUC",
    ]
)
df_index = 0
max_counts = -1  # 処理回数制限(-1で制限なし)
data_type_list = [
    "SmTIAS_Phone",
    "SmTIAS_Web",
    "HandyTCC_Phone",
    "HandyTCC_Web",
]  # データセットの種類
input_list = ["shape", "color", "texture", "all"]  # 入力の種類
target_list = ["A", "B", "C", "Total"]  # ターゲットスコア
model_list = [
    RandomForestClassifier(),
    AdaBoostClassifier(),
    SVC(),
]  # モデルの種類
hyper_params = {
    "RandomForest": {
        "n_estimators": [100, 200],
        "max_depth": [None, 10, 20],
        "min_samples_split": [2, 5],
        "min_samples_leaf": [1, 2],
        "bootstrap": [True, False],
    },
    "AdaBoost": {
        "n_estimators": [50, 70, 90, 120, 180, 200],
        "learning_rate": [10**i for i in range(-3, 1)],
    },
    "SVC": {
        "C": [10**i for i in range(-3, 3)],
        "kernel": ["linear", "rbf", "sigmoid"],
        "gamma": ["scale", "auto"],
    },
}


def make_dataset():
    """
    データセットを作成する関数
    """
    # print("\nPreparing dataset...")
    all_data = pd.read_csv(f"Merged_{df.loc[df_index, 'DataType']}App.csv")
    input = df.loc[df_index, "Input"]  # 入力の種類を取得
    if input == "shape":
        # print("Using shape features...")
        x = all_data.loc[:, "shape-width":"shape-bottomRightY"]
    elif input == "color":
        # print("Using color features...")
        x = all_data.loc[:, "chiu-lateral-L-min":"fiveClick-tip-b-kurtosis"]
    elif input == "texture":
        # print("Using texture features...")
        x = all_data.loc[:, "chiu-lateral-contrast":"fiveClick-tip-correlation"]
    else:
        # print("Using all features...")
        x = all_data.loc[:, "shape-width":"fiveClick-tip-b-kurtosis"]  # 全データ
    scores = all_data.loc[:, "A01":"C08"]  # スコア
    invert_list = ["A01", "A02", "A03"]
    for invert in invert_list:
        scores[invert] = 5 - scores[invert]  # 逆転スコア
    scores["A"] = scores.loc[:, "A01":"A10"].sum(axis=1)  # A群
    scores["B"] = scores.loc[:, "B07":"B29"].sum(axis=1)  # B群
    scores["C"] = scores.loc[:, "C01":"C08"].sum(axis=1)  # C群
    scores["Total"] = scores.loc[:, "A":"C"].sum(axis=1)  # 総合スコア
    target = df.loc[df_index, "Target"]  # ターゲットスコア
    threshold = scores[target].median()  # 中央値を閾値とする
    # print(f'Threshold for {target}: {threshold}')
    # scores['Total'].plot.hist(bins=20, edgecolor='black')  # ヒストグラムを描画
    # import matplotlib.pyplot as plt
    # plt.title('Total Score Distribution')
    # plt.xlabel('Total Score')
    # plt.ylabel('Frequency')
    # plt.show()  # ヒストグラムを表示
    scores["label"] = 0
    scores.loc[scores[target] >= threshold, "label"] = 1  # ラベル付け
    # print(scores.head(3))
    return x, scores["label"]


def nested_cross_val(model, x, y):
    """
    ネステッドクロスバリデーションを実行する関数
    """
    # print("\nStart nested cross-validation")
    outer_cv = KFold(n_splits=n_folds, shuffle=True)
    accuracy_list = []
    recall_list = []
    precision_list = []
    f1_list = []
    auc_list = []
    pcadim_list = []
    # 外側のクロスバリデーション
    for train, test in outer_cv.split(x, y):
        # データ取得
        x_train = x.iloc[train]
        y_train = y.iloc[train]
        x_test = x.iloc[test]
        y_test = y.iloc[test]

        # 前処理(標準化と次元削減)
        scaler = StandardScaler()
        x_train_scaled = scaler.fit_transform(x_train)
        pca_dim = calc_pca_dim(x_train_scaled)
        pcadim_list.append(pca_dim)
        pca = PCA(n_components=pca_dim)
        x_train_pca = pca.fit_transform(x_train_scaled)

        x_test_scaled = scaler.transform(x_test)
        x_test_pca = pca.transform(x_test_scaled)

        # ハイパーパラメータ最適化 グリッドサーチ
        gs = GridSearchCV(
            model,
            param_grid=hyper_params[df.loc[df_index, "Model"]],
            cv=n_folds,
            scoring="accuracy",
            n_jobs=-1,
        )
        gs.fit(x_train_pca, y_train)
        # print(f'Best parameters: {gs.best_params_}')
        # print(f'Best score: {gs.best_score_:.4f}')
        best_model = gs.best_estimator_
        # print(best_model)

        # モデルの評価
        y_pred = best_model.predict(x_test_pca)
        accuracy_list.append(accuracy_score(y_test, y_pred))
        recall_list.append(recall_score(y_test, y_pred))
        precision_list.append(precision_score(y_test, y_pred))
        f1_list.append(f1_score(y_test, y_pred))
        auc_list.append(roc_auc_score(y_test, y_pred))

    df.loc[df_index, "Accuracy"] = np.array(accuracy_list).mean()
    df.loc[df_index, "Recall"] = np.array(recall_list).mean()
    df.loc[df_index, "Precision"] = np.array(precision_list).mean()
    df.loc[df_index, "F1"] = np.array(f1_list).mean()
    df.loc[df_index, "AUC"] = np.array(auc_list).mean()
    # print(f'Accuracy: {np.array(accuracy_list).mean():.4f}')
    # print(f'Recall: {np.array(recall_list).mean():.4f}')
    # print(f'Precision: {np.array(precision_list).mean():.4f}')
    # print(f'F1: {np.array(f1_list).mean():.4f}')
    # print(f'AUC: {np.array(auc_list).mean():.4f}')


def calc_pca_dim(x):
    """
    主成分分析の次元数を計算する関数
    """
    # print("\nCalculating PCA dimensions...")
    scaler = StandardScaler()
    x_scaled = scaler.fit_transform(x)
    pca = PCA(n_components=None)  # 主成分分析の次元数を入力データの次元数に設定
    pca.fit(x_scaled)
    ccr = 0
    pca_dim = 0
    for i in range(pca.n_components_):
        ccr += pca.explained_variance_ratio_[i]
        if ccr >= pca_ccr:
            pca_dim = i + 1  # 次元数は0から始まるので1を足す
            # print(f'Number of components to reach 95% variance: {pca_dim}')
            break
    return pca_dim


def train_predict(x, y):
    """
    モデルを訓練し、予測を行う関数
    """
    # データセットを訓練用とテスト用に分割
    print("\nStarting model training and prediction...")
    x_train, x_test, y_train, y_test = train_test_split(
        x, y, test_size=0.2, random_state=42
    )
    input_dim = x_train.shape[1]
    print(f"Input dim:{input_dim} Train:{x_train.shape[0]}, Test: {x_test.shape[0]}")

    pca_dim = calc_pca_dim(x_train)
    pipe = Pipeline(
        [
            ("scaler", StandardScaler()),
            ("pca", PCA(n_components=pca_dim)),
            ("model", RandomForestClassifier()),
        ]
    )

    pipe.fit(x_train, y_train)
    y_pred = pipe.predict(x_test)
    # scaler = StandardScaler()
    # x_train = scaler.fit_transform(x_train)
    # x_test = scaler.transform(x_test)
    # model = RandomForestClassifier()
    # model.fit(x_train, y_train)
    # y_pred = model.predict(x_test)
    accuracy = accuracy_score(y_test, y_pred)
    df["accuracy"] = accuracy
    print(f"Accuracy: {accuracy:.2f}")
    # print(classification_report(y_test, y_pred))
    print("Confusion Matrix:")
    print(confusion_matrix(y_test, y_pred))


# メイン関数
if __name__ == "__main__":
    print("Starting analysis...")
    counts = len(data_type_list) * len(input_list) * len(target_list) * len(model_list)
    if max_counts > 0:
        counts = min(counts, max_counts)

    for data_type in data_type_list:
        for input in input_list:  # 入力の種類を指定
            for target in target_list:
                for model in model_list:
                    if df_index >= counts:
                        continue

                    model_name = model.__class__.__name__.replace(
                        "Classifier", ""
                    )  # モデル名から"Classifier"を削除
                    print(
                        f"\nAnalyzing {df_index + 1}/{counts} {data_type}, {input}, {target}, {model_name}"
                    )
                    df.loc[df_index, :] = [
                        data_type,
                        input,
                        target,
                        model_name,
                        0,
                        0,
                        0,
                        0,
                        0,
                    ]  # 初期化
                    x, y = make_dataset()  # 入力とターゲットを指定
                    # train_predict(x, y)  # モデルの訓練と予測を実行
                    nested_cross_val(model, x, y)  # クロスバリデーションを実行
                    df_index += 1

    print("\nAnalysis complete.")
    print(df)  # 結果を表示
    df.to_csv("analysis_results.csv", index=False)  # 結果をCSVファイルに保存