import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.ensemble import AdaBoostClassifier, RandomForestClassifier
from sklearn.metrics import (
accuracy_score,
confusion_matrix,
f1_score,
precision_score,
recall_score,
roc_auc_score,
)
from sklearn.model_selection import GridSearchCV, KFold, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
pca_ccr = 0.98 # 主成分分析の累積寄与率の閾値
n_folds = 5 # クロスバリデーションの分割数
df = pd.DataFrame(
columns=[
"DataType",
"Input",
"Target",
"Model",
"Accuracy",
"Recall",
"Precision",
"F1",
"AUC",
]
)
df_index = 0
max_counts = -1 # 処理回数制限(-1で制限なし)
data_type_list = [
"SmTIAS_Phone",
"SmTIAS_Web",
"HandyTCC_Phone",
"HandyTCC_Web",
] # データセットの種類
input_list = ["shape", "color", "texture", "all"] # 入力の種類
target_list = ["A", "B", "C", "Total", "Raw", "Conv"] # ターゲットスコア
# target_list = ["Raw", "Conv"] # ターゲットスコア追加解析
model_list = [
RandomForestClassifier(),
AdaBoostClassifier(),
SVC(),
] # モデルの種類
hyper_params = {
"RandomForest": {
"n_estimators": [100, 200],
"max_depth": [None, 10, 20],
"min_samples_split": [2, 5],
"min_samples_leaf": [1, 2],
"bootstrap": [True, False],
},
"AdaBoost": {
"n_estimators": [50, 70, 90, 120, 180, 200],
"learning_rate": [10**i for i in range(-3, 1)],
},
"SVC": {
"C": [10**i for i in range(-3, 3)],
"kernel": ["linear", "rbf", "sigmoid"],
"gamma": ["scale", "auto"],
},
}
def make_dataset():
"""
データセットを作成する関数
"""
# print("\nPreparing dataset...")
all_data = pd.read_csv(f"Merged_{df.loc[df_index, 'DataType']}App.csv")
input = df.loc[df_index, "Input"] # 入力の種類を取得
if input == "shape":
# print("Using shape features...")
x = all_data.loc[:, "shape-width":"shape-bottomRightY"]
elif input == "color":
# print("Using color features...")
x = all_data.loc[:, "chiu-lateral-L-min":"fiveClick-tip-b-kurtosis"]
elif input == "texture":
# print("Using texture features...")
x = all_data.loc[:, "chiu-lateral-contrast":"fiveClick-tip-correlation"]
else:
# print("Using all features...")
x = all_data.loc[:, "shape-width":"fiveClick-tip-b-kurtosis"] # 全データ
scores = all_data.loc[:, "A01":"C08"] # スコア
scores["id"] = all_data["ID"] # IDを追加
invert_list = ["A01", "A02", "A03"]
for invert in invert_list:
scores[invert] = 5 - scores[invert] # 逆転スコア
scores["A"] = scores.loc[:, "A01":"A10"].sum(axis=1) # A群
scores["B"] = scores.loc[:, "B07":"B29"].sum(axis=1) # B群
scores["C"] = scores.loc[:, "C01":"C08"].sum(axis=1) # C群
scores["Total"] = scores.loc[:, "A":"C"].sum(axis=1) # 総合スコア
target = df.loc[df_index, "Target"] # ターゲットスコア
# 高ストレス ラベル決定
scores["label"] = 0
if target == "Raw":
for idx, row in scores.iterrows():
if row["B"] >= 31 or (row["B"] >= 23 and (row["A"] + row["C"]) >= 39):
scores.loc[idx, "label"] = 1 # ラベル付け
elif target == "Conv":
for idx, row in scores.iterrows():
A1 = row["A01"] + row["A02"] +row["A03"] # A群の1-3
A2 = 15 - (row["A08"] + row["A09"] +row["A10"])
B1 = row["B07"] + row["B08"] +row["B09"] # B群の7-9
B2 = row["B10"] + row["B11"] +row["B12"]
B3 = row["B13"] + row["B14"] +row["B16"]
B4 = row["B27"]
B5 = row["B29"]
C1 = 15 - (row["C01"] + row["C04"] +row["C07"])
C2 = 15 - (row["C02"] + row["C05"] +row["C08"])
ConvA1 = 1 if A1 >= 12 else 2 if A1 >= 10 else 3 if A1 >= 8 else 4 if A1 >= 6 else 5
ConvA2 = 1 if A2 <= 4 else 2 if A2 <= 6 else 3 if A2 <= 8 else 4 if A2 <= 10 else 5
ConvB1 = 1 if B1 >= 11 else 2 if B1 >= 8 else 3 if B1 >= 5 else 4 if B1 >= 4 else 5
ConvB2 = 1 if B2 >= 10 else 2 if B2 >= 8 else 3 if B2 >= 5 else 4 if B2 >= 4 else 5
ConvB3 = 1 if B3 >= 10 else 2 if B3 >= 7 else 3 if B3 >= 5 else 4 if B3 >= 4 else 5
ConvB4 = 1 if B4 >= 4 else 2 if B4 >= 3 else 3 if B4 >= 2 else 5
ConvB5 = 1 if B5 >= 4 else 2 if B5 >= 3 else 3 if B5 >= 2 else 5
ConvC1 = 1 if C1 <= 4 else 2 if C1 <= 6 else 3 if C1 <= 8 else 4 if C1 <= 10 else 5
ConvC2 = 1 if C2 <= 5 else 2 if C2 <= 7 else 3 if C2 <= 9 else 4 if C2 <= 11 else 5
ConvAC = ConvA1 + ConvA2 + ConvC1 + ConvC2
ConvB = ConvB1 + ConvB2 + ConvB3 + ConvB4 + ConvB5
if ConvB <= 11 or (ConvB <= 16 and ConvAC <= 8):
scores.loc[idx, "label"] = 1
else:
threshold = scores[target].median() # 中央値を閾値とする
# print(f'Threshold for {target}: {threshold}')
scores.loc[scores[target] >= threshold, "label"] = 1 # ラベル付け
# target = 'Total'
# scores[target].plot.hist(bins=20, edgecolor='black') # ヒストグラムを描画
# import matplotlib.pyplot as plt
# plt.title(f'{target} Score Distribution')
# plt.xlabel(f'{target} Score')
# plt.ylabel('Frequency')
# plt.show() # ヒストグラムを表示
# exit()
# print(scores[["id", "label"]])
# scores[["id", "label"]].to_csv("labels.csv", index=False) # ラベルをCSVファイルに保存
# print("num positive labels:", scores["label"].sum())
# exit()
return x, scores["label"]
def nested_cross_val(model, x, y):
"""
ネステッドクロスバリデーションを実行する関数
"""
# print("\nStart nested cross-validation")
outer_cv = KFold(n_splits=n_folds, shuffle=True)
accuracy_list = []
recall_list = []
precision_list = []
f1_list = []
auc_list = []
pcadim_list = []
# 外側のクロスバリデーション
for train, test in outer_cv.split(x, y):
# データ取得
x_train = x.iloc[train]
y_train = y.iloc[train]
x_test = x.iloc[test]
y_test = y.iloc[test]
# 前処理(標準化と次元削減)
scaler = StandardScaler()
x_train_scaled = scaler.fit_transform(x_train)
pca_dim = calc_pca_dim(x_train_scaled)
pcadim_list.append(pca_dim)
pca = PCA(n_components=pca_dim)
x_train_pca = pca.fit_transform(x_train_scaled)
x_test_scaled = scaler.transform(x_test)
x_test_pca = pca.transform(x_test_scaled)
# ハイパーパラメータ最適化 グリッドサーチ
gs = GridSearchCV(
model,
param_grid=hyper_params[df.loc[df_index, "Model"]],
cv=n_folds,
scoring="accuracy",
n_jobs=-1,
)
gs.fit(x_train_pca, y_train)
# print(f'Best parameters: {gs.best_params_}')
# print(f'Best score: {gs.best_score_:.4f}')
best_model = gs.best_estimator_
# print(best_model)
# モデルの評価
y_pred = best_model.predict(x_test_pca)
accuracy_list.append(accuracy_score(y_test, y_pred))
recall_list.append(recall_score(y_test, y_pred))
precision_list.append(precision_score(y_test, y_pred))
f1_list.append(f1_score(y_test, y_pred))
auc_list.append(roc_auc_score(y_test, y_pred))
df.loc[df_index, "Accuracy"] = np.array(accuracy_list).mean()
df.loc[df_index, "Recall"] = np.array(recall_list).mean()
df.loc[df_index, "Precision"] = np.array(precision_list).mean()
df.loc[df_index, "F1"] = np.array(f1_list).mean()
df.loc[df_index, "AUC"] = np.array(auc_list).mean()
# print(f'Accuracy: {np.array(accuracy_list).mean():.4f}')
# print(f'Recall: {np.array(recall_list).mean():.4f}')
# print(f'Precision: {np.array(precision_list).mean():.4f}')
# print(f'F1: {np.array(f1_list).mean():.4f}')
# print(f'AUC: {np.array(auc_list).mean():.4f}')
def calc_pca_dim(x):
"""
主成分分析の次元数を計算する関数
"""
# print("\nCalculating PCA dimensions...")
scaler = StandardScaler()
x_scaled = scaler.fit_transform(x)
pca = PCA(n_components=None) # 主成分分析の次元数を入力データの次元数に設定
pca.fit(x_scaled)
ccr = 0
pca_dim = 0
for i in range(pca.n_components_):
ccr += pca.explained_variance_ratio_[i]
if ccr >= pca_ccr:
pca_dim = i + 1 # 次元数は0から始まるので1を足す
# print(f'Number of components to reach 95% variance: {pca_dim}')
break
return pca_dim
def train_predict(x, y):
"""
モデルを訓練し、予測を行う関数
"""
# データセットを訓練用とテスト用に分割
print("\nStarting model training and prediction...")
x_train, x_test, y_train, y_test = train_test_split(
x, y, test_size=0.2, random_state=42
)
input_dim = x_train.shape[1]
print(f"Input dim:{input_dim} Train:{x_train.shape[0]}, Test: {x_test.shape[0]}")
pca_dim = calc_pca_dim(x_train)
pipe = Pipeline(
[
("scaler", StandardScaler()),
("pca", PCA(n_components=pca_dim)),
("model", RandomForestClassifier()),
]
)
pipe.fit(x_train, y_train)
y_pred = pipe.predict(x_test)
# scaler = StandardScaler()
# x_train = scaler.fit_transform(x_train)
# x_test = scaler.transform(x_test)
# model = RandomForestClassifier()
# model.fit(x_train, y_train)
# y_pred = model.predict(x_test)
accuracy = accuracy_score(y_test, y_pred)
df["accuracy"] = accuracy
print(f"Accuracy: {accuracy:.2f}")
# print(classification_report(y_test, y_pred))
print("Confusion Matrix:")
print(confusion_matrix(y_test, y_pred))
# メイン関数
if __name__ == "__main__":
print("Starting analysis...")
counts = len(data_type_list) * len(input_list) * len(target_list) * len(model_list)
if max_counts > 0:
counts = min(counts, max_counts)
for data_type in data_type_list:
for input in input_list: # 入力の種類を指定
for target in target_list:
for model in model_list:
if df_index >= counts:
continue
model_name = model.__class__.__name__.replace(
"Classifier", ""
) # モデル名から"Classifier"を削除
print(
f"\nAnalyzing {df_index + 1}/{counts} {data_type}, {input}, {target}, {model_name}"
)
df.loc[df_index, :] = [
data_type,
input,
target,
model_name,
0,
0,
0,
0,
0,
] # 初期化
x, y = make_dataset() # 入力とターゲットを指定
# train_predict(x, y) # モデルの訓練と予測を実行
nested_cross_val(model, x, y) # クロスバリデーションを実行
df_index += 1
print("\nAnalysis complete.")
print(df) # 結果を表示
df.to_csv("analysis_results.csv", index=False) # 結果をCSVファイルに保存