import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.ensemble import AdaBoostClassifier, RandomForestClassifier
from sklearn.metrics import (
accuracy_score,
confusion_matrix,
f1_score,
precision_score,
recall_score,
roc_auc_score,
)
from sklearn.model_selection import GridSearchCV, KFold, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
pca_ccr = 0.98 # 主成分分析の累積寄与率の閾値
n_folds = 5 # クロスバリデーションの分割数
df = pd.DataFrame(
columns=[
"DataType",
"Input",
"Target",
"Model",
"Accuracy",
"Recall",
"Precision",
"F1",
"AUC",
]
)
df_index = 0
max_counts = -1 # 処理回数制限(-1で制限なし)
data_type_list = [
"SmTIAS_Phone",
"SmTIAS_Web",
"HandyTCC_Phone",
"HandyTCC_Web",
] # データセットの種類
input_list = ["shape", "color", "texture", "all"] # 入力の種類
target_list = ["A", "B", "C", "Total"] # ターゲットスコア
model_list = [
RandomForestClassifier(),
AdaBoostClassifier(),
SVC(),
] # モデルの種類
hyper_params = {
"RandomForest": {
"n_estimators": [100, 200],
"max_depth": [None, 10, 20],
"min_samples_split": [2, 5],
"min_samples_leaf": [1, 2],
"bootstrap": [True, False],
},
"AdaBoost": {
"n_estimators": [50, 70, 90, 120, 180, 200],
"learning_rate": [10**i for i in range(-3, 1)],
},
"SVC": {
"C": [10**i for i in range(-3, 3)],
"kernel": ["linear", "rbf", "sigmoid"],
"gamma": ["scale", "auto"],
},
}
def make_dataset():
"""
データセットを作成する関数
"""
# print("\nPreparing dataset...")
all_data = pd.read_csv(f"Merged_{df.loc[df_index, 'DataType']}App.csv")
input = df.loc[df_index, "Input"] # 入力の種類を取得
if input == "shape":
# print("Using shape features...")
x = all_data.loc[:, "shape-width":"shape-bottomRightY"]
elif input == "color":
# print("Using color features...")
x = all_data.loc[:, "chiu-lateral-L-min":"fiveClick-tip-b-kurtosis"]
elif input == "texture":
# print("Using texture features...")
x = all_data.loc[:, "chiu-lateral-contrast":"fiveClick-tip-correlation"]
else:
# print("Using all features...")
x = all_data.loc[:, "shape-width":"fiveClick-tip-b-kurtosis"] # 全データ
scores = all_data.loc[:, "A01":"C08"] # スコア
invert_list = ["A01", "A02", "A03"]
for invert in invert_list:
scores[invert] = 5 - scores[invert] # 逆転スコア
scores["A"] = scores.loc[:, "A01":"A10"].sum(axis=1) # A群
scores["B"] = scores.loc[:, "B07":"B29"].sum(axis=1) # B群
scores["C"] = scores.loc[:, "C01":"C08"].sum(axis=1) # C群
scores["Total"] = scores.loc[:, "A":"C"].sum(axis=1) # 総合スコア
target = df.loc[df_index, "Target"] # ターゲットスコア
# target = 'Total'
threshold = scores[target].median() # 中央値を閾値とする
# print(f'Threshold for {target}: {threshold}')
# scores[target].plot.hist(bins=20, edgecolor='black') # ヒストグラムを描画
# import matplotlib.pyplot as plt
# plt.title(f'{target} Score Distribution')
# plt.xlabel(f'{target} Score')
# plt.ylabel('Frequency')
# plt.show() # ヒストグラムを表示
# exit()
scores["label"] = 0
scores.loc[scores[target] >= threshold, "label"] = 1 # ラベル付け
# print(scores.head(3))
return x, scores["label"]
def nested_cross_val(model, x, y):
"""
ネステッドクロスバリデーションを実行する関数
"""
# print("\nStart nested cross-validation")
outer_cv = KFold(n_splits=n_folds, shuffle=True)
accuracy_list = []
recall_list = []
precision_list = []
f1_list = []
auc_list = []
pcadim_list = []
# 外側のクロスバリデーション
for train, test in outer_cv.split(x, y):
# データ取得
x_train = x.iloc[train]
y_train = y.iloc[train]
x_test = x.iloc[test]
y_test = y.iloc[test]
# 前処理(標準化と次元削減)
scaler = StandardScaler()
x_train_scaled = scaler.fit_transform(x_train)
pca_dim = calc_pca_dim(x_train_scaled)
pcadim_list.append(pca_dim)
pca = PCA(n_components=pca_dim)
x_train_pca = pca.fit_transform(x_train_scaled)
x_test_scaled = scaler.transform(x_test)
x_test_pca = pca.transform(x_test_scaled)
# ハイパーパラメータ最適化 グリッドサーチ
gs = GridSearchCV(
model,
param_grid=hyper_params[df.loc[df_index, "Model"]],
cv=n_folds,
scoring="accuracy",
n_jobs=-1,
)
gs.fit(x_train_pca, y_train)
# print(f'Best parameters: {gs.best_params_}')
# print(f'Best score: {gs.best_score_:.4f}')
best_model = gs.best_estimator_
# print(best_model)
# モデルの評価
y_pred = best_model.predict(x_test_pca)
accuracy_list.append(accuracy_score(y_test, y_pred))
recall_list.append(recall_score(y_test, y_pred))
precision_list.append(precision_score(y_test, y_pred))
f1_list.append(f1_score(y_test, y_pred))
auc_list.append(roc_auc_score(y_test, y_pred))
df.loc[df_index, "Accuracy"] = np.array(accuracy_list).mean()
df.loc[df_index, "Recall"] = np.array(recall_list).mean()
df.loc[df_index, "Precision"] = np.array(precision_list).mean()
df.loc[df_index, "F1"] = np.array(f1_list).mean()
df.loc[df_index, "AUC"] = np.array(auc_list).mean()
# print(f'Accuracy: {np.array(accuracy_list).mean():.4f}')
# print(f'Recall: {np.array(recall_list).mean():.4f}')
# print(f'Precision: {np.array(precision_list).mean():.4f}')
# print(f'F1: {np.array(f1_list).mean():.4f}')
# print(f'AUC: {np.array(auc_list).mean():.4f}')
def calc_pca_dim(x):
"""
主成分分析の次元数を計算する関数
"""
# print("\nCalculating PCA dimensions...")
scaler = StandardScaler()
x_scaled = scaler.fit_transform(x)
pca = PCA(n_components=None) # 主成分分析の次元数を入力データの次元数に設定
pca.fit(x_scaled)
ccr = 0
pca_dim = 0
for i in range(pca.n_components_):
ccr += pca.explained_variance_ratio_[i]
if ccr >= pca_ccr:
pca_dim = i + 1 # 次元数は0から始まるので1を足す
# print(f'Number of components to reach 95% variance: {pca_dim}')
break
return pca_dim
def train_predict(x, y):
"""
モデルを訓練し、予測を行う関数
"""
# データセットを訓練用とテスト用に分割
print("\nStarting model training and prediction...")
x_train, x_test, y_train, y_test = train_test_split(
x, y, test_size=0.2, random_state=42
)
input_dim = x_train.shape[1]
print(f"Input dim:{input_dim} Train:{x_train.shape[0]}, Test: {x_test.shape[0]}")
pca_dim = calc_pca_dim(x_train)
pipe = Pipeline(
[
("scaler", StandardScaler()),
("pca", PCA(n_components=pca_dim)),
("model", RandomForestClassifier()),
]
)
pipe.fit(x_train, y_train)
y_pred = pipe.predict(x_test)
# scaler = StandardScaler()
# x_train = scaler.fit_transform(x_train)
# x_test = scaler.transform(x_test)
# model = RandomForestClassifier()
# model.fit(x_train, y_train)
# y_pred = model.predict(x_test)
accuracy = accuracy_score(y_test, y_pred)
df["accuracy"] = accuracy
print(f"Accuracy: {accuracy:.2f}")
# print(classification_report(y_test, y_pred))
print("Confusion Matrix:")
print(confusion_matrix(y_test, y_pred))
# メイン関数
if __name__ == "__main__":
print("Starting analysis...")
counts = len(data_type_list) * len(input_list) * len(target_list) * len(model_list)
if max_counts > 0:
counts = min(counts, max_counts)
for data_type in data_type_list:
for input in input_list: # 入力の種類を指定
for target in target_list:
for model in model_list:
if df_index >= counts:
continue
model_name = model.__class__.__name__.replace(
"Classifier", ""
) # モデル名から"Classifier"を削除
print(
f"\nAnalyzing {df_index + 1}/{counts} {data_type}, {input}, {target}, {model_name}"
)
df.loc[df_index, :] = [
data_type,
input,
target,
model_name,
0,
0,
0,
0,
0,
] # 初期化
x, y = make_dataset() # 入力とターゲットを指定
# train_predict(x, y) # モデルの訓練と予測を実行
nested_cross_val(model, x, y) # クロスバリデーションを実行
df_index += 1
print("\nAnalysis complete.")
print(df) # 結果を表示
df.to_csv("analysis_results.csv", index=False) # 結果をCSVファイルに保存