Newer
Older
StressAnalysis / analysis.py
import pandas as pd
from sklearn.model_selection import train_test_split, KFold, cross_val_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt

pca_ccr = 0.98  # 主成分分析の累積寄与率の閾値
df = pd.DataFrame(columns=['input', 'target', 'pca_dim', 'accuracy', 'recall', 'precision', 'F1', 'AUC'])
df_index = 0

def make_dataset():
    """
    データセットを作成する関数
    """
    print("\nPreparing dataset...")
    all_data = pd.read_csv('SmTIAS_PhoneApp.csv')
    input = df.loc[df_index, 'input']  # 入力の種類を取得
    if input == "shape":
        print("Using shape features...")
        x = all_data.loc[:, 'shape-width':'shape-bottomRightY']
    elif input == "color":
        print("Using color features...")
        x = all_data.loc[:, 'chiu-lateral-L-min':'fiveClick-tip-b-kurtosis']
    elif input == "texture":
        print("Using texture features...")
        x = all_data.loc[:, 'chiu-lateral-contrast':'fiveClick-tip-correlation']
    else:
        print("Using all features...")
        x = all_data.loc[:, 'shape-width':'fiveClick-tip-b-kurtosis'] # 全データ
    scores = all_data.loc[:, 'A01':'C08'] # スコア
    invert_list = ['A01', 'A02', 'A03']
    for invert in invert_list:
        scores[invert] = 5 - scores[invert]  # 逆転スコア
    scores['A'] = scores.loc[:, 'A01':'A10'].sum(axis=1)  # A群
    scores['B'] = scores.loc[:, 'B07':'B29'].sum(axis=1)  # B群
    scores['C'] = scores.loc[:, 'C01':'C08'].sum(axis=1)  # C群
    scores['Total'] = scores.loc[:, 'A':'C'].sum(axis=1)  # 総合スコア
    target = df.loc[df_index, 'target']  # ターゲットスコア
    threshold = scores[target].median()  # 中央値を閾値とする
    print(f'Threshold for {target}: {threshold}')
    # scores['Total'].plot.hist(bins=20, edgecolor='black')  # ヒストグラムを描画
    # import matplotlib.pyplot as plt
    # plt.title('Total Score Distribution')
    # plt.xlabel('Total Score')
    # plt.ylabel('Frequency') 
    # plt.show()  # ヒストグラムを表示
    scores['label'] = 0
    scores.loc[scores[target] >= threshold, 'label'] = 1  # ラベル付け
    # print(scores.head(3))
    return x, scores['label']

def cross_val(x, y):
    """
    クロスバリデーションを行う関数
    """
    print("\nStarting cross-validation...")
    pca_dim = calc_pca_dim(x)
    kfold_cv = KFold(n_splits=5, shuffle=True)
    pipe = Pipeline([("scaler", StandardScaler()),
                  ("pca", PCA(n_components=pca_dim)),
                  ("model", RandomForestClassifier())])
    accuracy = cross_val_score(pipe, x, y, cv=kfold_cv, scoring="accuracy").mean()
    recall = cross_val_score(pipe, x, y, cv=kfold_cv, scoring="recall").mean()
    precision = cross_val_score(pipe, x, y, cv=kfold_cv, scoring="precision").mean()
    F1 = cross_val_score(pipe, x, y, cv=kfold_cv, scoring="f1").mean()
    auc = cross_val_score(pipe, x, y, cv=kfold_cv, scoring="roc_auc").mean()
    print(f"Accuracy: {accuracy:.3f}, recall: {recall:.3f}, precision: {precision:.3f}, F1: {F1:.3f}, AUC: {auc:.3f}")

def calc_pca_dim(x):
    """
    主成分分析の次元数を計算する関数
    """
    print("\nCalculating PCA dimensions...")
    scaler = StandardScaler()
    x_scaled = scaler.fit_transform(x)
    pca = PCA(n_components=None)  # 主成分分析の次元数を入力データの次元数に設定
    pca.fit(x_scaled)
    ccr = 0
    pca_dim = 0
    for i in range(pca.n_components_):
        ccr += pca.explained_variance_ratio_[i]
        if ccr >= pca_ccr:
            pca_dim = i + 1  # 次元数は0から始まるので1を足す
            print(f'Number of components to reach 95% variance: {pca_dim}')
            break
    return pca_dim

def train_predict(x, y):
    """
    モデルを訓練し、予測を行う関数
    """
    # データセットを訓練用とテスト用に分割
    print("\nStarting model training and prediction...")
    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=42)
    input_dim = x_train.shape[1]
    print(f"Input dim:{input_dim} Train:{x_train.shape[0]}, Test: {x_test.shape[0]}")

    pca_dim = calc_pca_dim(x_train)
    pipe = Pipeline([("scaler", StandardScaler()),
                  ("pca", PCA(n_components=pca_dim)),
                  ("model", RandomForestClassifier())])

    pipe.fit(x_train, y_train)
    y_pred = pipe.predict(x_test)
    # scaler = StandardScaler()
    # x_train = scaler.fit_transform(x_train)
    # x_test = scaler.transform(x_test)
    # model = RandomForestClassifier()
    # model.fit(x_train, y_train)
    # y_pred = model.predict(x_test)
    accuracy = accuracy_score(y_test, y_pred)
    df['accuracy'] = accuracy
    print(f'Accuracy: {accuracy:.2f}')
    # print(classification_report(y_test, y_pred))
    print('Confusion Matrix:')
    print(confusion_matrix(y_test, y_pred))

# メイン関数
if __name__ == "__main__":
    df.loc[df_index,:] = ['all', 'Total', 0, 0, 0, 0, 0, 0]  # 初期化
    x, y = make_dataset()  # 入力とターゲットを指定
    train_predict(x, y)  # モデルの訓練と予測を実行
    # cross_val(x, y, df[idx])  # クロスバリデーションを実行
    print("\nAnalysis complete.")
    print(df)  # 結果を表示