import os import yaml import pandas as pd import numpy as np import torch from torch import nn from torch.utils.data import DataLoader, TensorDataset from sklearn.model_selection import StratifiedKFold from sklearn.metrics import precision_score, recall_score, f1_score from sklearn.utils.class_weight import compute_class_weight import logging import matplotlib.pyplot as plt import argparse class MLP(nn.Module): def __init__(self, config): super(MLP, self).__init__() self.model = nn.Sequential( nn.Linear(len(config['feature_names']), 32), nn.ReLU(), nn.Linear(32, 128), nn.ReLU(), nn.Linear(128, 32), nn.ReLU(), nn.Linear(32, config['nc']), ) def forward(self, x): return self.model(x) def load_and_split_data(config): parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) file_path = os.path.join(parent_dir, config['data_path']) data = pd.read_excel(file_path) X = data[config['feature_names']].values y = data[config['label_name']].values skf_outer = StratifiedKFold(n_splits=5, shuffle=True) train_index_outer, test_index_outer = next(skf_outer.split(X, y)) X_train_val, X_infer = X[train_index_outer], X[test_index_outer] y_train_val, y_infer = y[train_index_outer], y[test_index_outer] skf_inner = StratifiedKFold(n_splits=5, shuffle=True) train_index_inner, test_index_inner = next(skf_inner.split(X_train_val, y_train_val)) X_train, X_val = X_train_val[train_index_inner], X_train_val[test_index_inner] y_train, y_val = y_train_val[train_index_inner], y_train_val[test_index_inner] return X, y, X_train_val, y_train_val, X_train, y_train, X_val, y_val, X_infer, y_infer def save_model(model_path, best_model): torch.save(best_model, model_path) def evaluate_model(model_path, X_infer, y_infer, config): # 如果传入的是模型文件路径,则从该路径加载模型 if isinstance(model_path, str): model = MLP(config).to(config['device']) model.load_state_dict(torch.load(model_path, map_location=config['device'])) # 加载训练好的模型参数 else: model = model_path # infer_data = pd.DataFrame(X_infer, columns=config['feature_names']) # infer_data[config['label_name']] = y_infer # infer_data.to_excel(os.path.join(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")), config['infer_path']), index=False) model.eval() with torch.no_grad(): outputs = model(torch.from_numpy(X_infer).float().to(config['device'])) _, predictions = torch.max(outputs, 1) wrong_indices = np.where(y_infer != predictions.cpu().numpy())[0] wrong_count = len(wrong_indices) total_count = len(y_infer) wrong_percentage = (wrong_count / total_count) * 100 print("Infer Result: ") logging.info("Infer Result: ") print("预测错误数量:", wrong_count) print("预测错误占总数量的百分比:", wrong_percentage, "%") print("总数量:", total_count) logging.info(f"Prediction errors: {wrong_count}") logging.info(f"Prediction error percentage: {wrong_percentage:.2f}%") logging.info(f"Total samples: {total_count}") precision = precision_score(y_infer, predictions.cpu().numpy(), average=None) recall = recall_score(y_infer, predictions.cpu().numpy(), average=None) f1 = f1_score(y_infer, predictions.cpu().numpy(), average=None) avg_precision = np.mean(precision) avg_recall = np.mean(recall) avg_f1 = np.mean(f1) for i in range(len(precision)): print(f"Class {i} Precision: {precision[i]:.4f}, Recall: {recall[i]:.4f}, F1: {f1[i]:.4f}") print("精确率:", precision) print("召回率:", recall) print("F1得分:", f1) print("平均精确率:", avg_precision) print("平均召回率:", avg_recall) print("平均F1得分:", avg_f1) print("Infer Result End: ") logging.info("Infer Result End: ") fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(15, 5)) ax1.bar(np.arange(len(precision)), precision) ax1.set_title('Precision') ax2.bar(np.arange(len(recall)), recall) ax2.set_title('Recall') ax3.bar(np.arange(len(f1)), f1) ax3.set_title('F1 Score') # 保存图片 parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) evaluate_result_path = os.path.join(parent_dir, config['evaluate_result_path']) plt.savefig(evaluate_result_path) return avg_f1, wrong_percentage, precision, recall, f1 def inference_model(model_path, X_infer, y_infer, config): # 如果传入的是模型文件路径,则从该路径加载模型 if isinstance(model_path, str): model = MLP(config).to(config['device']) model.load_state_dict(torch.load(model_path, map_location=config['device'])) # 加载训练好的模型参数 else: model = model_path # infer_data = pd.DataFrame(X_infer, columns=config['feature_names']) # infer_data[config['label_name']] = y_infer # infer_data.to_excel(os.path.join(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")), config['infer_path']), index=False) model.eval() # 推理 with torch.no_grad(): outputs = model(torch.from_numpy(X_infer).float().to(config['device'])) # 获取预测结果 _, predictions = torch.max(outputs, 1) # 实际类别从1开始,程序类别从0开始 predictions += 1 # 打印预测结果 # print("预测结果:", predictions.cpu().numpy()) # 返回预测结果 return predictions.cpu().numpy().tolist() def train_detect(config): X, y, X_train_val, y_train_val, X_train, y_train, X_val, y_val, X_infer, y_infer = load_and_split_data(config) if config['data_train'] == r'train_val': train_dataset = TensorDataset(torch.from_numpy(X_train_val).float().to(config['device']), torch.from_numpy(y_train_val).long().to(config['device'])) val_dataset = TensorDataset(torch.from_numpy(X_infer).float().to(config['device']), torch.from_numpy(y_infer).long().to(config['device'])) class_weights = torch.tensor(compute_class_weight('balanced', classes=np.unique(y_train_val), y=y_train_val), dtype=torch.float32).to(config['device']) logging.info(f"Class weights: {class_weights}") elif config['data_train'] == r'train': train_dataset = TensorDataset(torch.from_numpy(X_train).float().to(config['device']), torch.from_numpy(y_train).long().to(config['device'])) val_dataset = TensorDataset(torch.from_numpy(X_val).float().to(config['device']), torch.from_numpy(y_val).long().to(config['device'])) class_weights = torch.tensor(compute_class_weight('balanced', classes=np.unique(y_train), y=y_train), dtype=torch.float32).to(config['device']) logging.info(f"Class weights: {class_weights}") elif config['data_train'] == r'all': train_dataset = TensorDataset(torch.from_numpy(X).float().to(config['device']), torch.from_numpy(y).long().to(config['device'])) val_dataset = TensorDataset(torch.from_numpy(X).float().to(config['device']), torch.from_numpy(y).long().to(config['device'])) X_infer = X y_infer = y class_weights = torch.tensor(compute_class_weight('balanced', classes=np.unique(y), y=y), dtype=torch.float32).to(config['device']) logging.info(f"Class weights: {class_weights}") else: print("Error: Set data_train first in yaml!") logging.error("Error: Set data_train first in yaml!") train_loader = DataLoader(train_dataset, batch_size=config['batch_size'], shuffle=True) val_loader = DataLoader(val_dataset, batch_size=config['batch_size']) model = MLP(config).to(config['device']) criterion = nn.CrossEntropyLoss(weight=class_weights) optimizer = torch.optim.Adam(model.parameters(), lr=config['learning_rate']) scheduler = torch.optim.lr_scheduler.StepLR(optimizer, config['step_size'], config['gamma']) best_val_f1, best_val_recall, best_val_precision, best_epoch, best_model = train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, config) # 保存模型 parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) model_path = os.path.join(parent_dir, config['model_path']) save_model(model_path, best_model) logging.info(f"Best Validation F1 Score (Macro): {best_val_f1:.4f}") logging.info(f"Best Validation Recall (Macro): {best_val_recall:.4f}") logging.info(f"Best Validation Precision (Macro): {best_val_precision:.4f}") logging.info(f"Best Epoch: {best_epoch + 1}") print(f"Best Validation F1 Score (Macro): {best_val_f1:.4f}") print(f"Best Validation Recall (Macro): {best_val_recall:.4f}") print(f"Best Validation Precision (Macro): {best_val_precision:.4f}") print(f"Best Epoch: {best_epoch + 1}") avg_f1, wrong_percentage, precision, recall, f1 = evaluate_model(model, X_infer, y_infer, config) return avg_f1, wrong_percentage, precision, recall, f1 def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, config): n_epochs = config['n_epochs'] best_val_f1 = 0.0 best_val_recall = 0.0 best_val_precision = 0.0 best_epoch = -1 best_model = None patience = config['early_stop_patience'] trigger_times = 0 train_loss_history = [] train_acc_history = [] val_loss_history = [] val_acc_history = [] val_f1_history = [] val_precision_history = [] val_recall_history = [] plt.rcParams['figure.max_open_warning'] = 50 for epoch in range(n_epochs): # 训练阶段 model.train() train_loss, train_acc = 0, 0 for inputs, targets in train_loader: optimizer.zero_grad() outputs = model(inputs) loss = criterion(outputs, targets) loss.backward() optimizer.step() train_loss += loss.item() * inputs.size(0) _, preds = torch.max(outputs, 1) train_acc += torch.sum(preds == targets.data) train_loss /= len(train_loader.dataset) train_acc = train_acc.double().cpu() / len(train_loader.dataset) # 更新学习率 scheduler.step() # 验证阶段 model.eval() val_loss, val_acc, all_preds, all_targets = 0, 0, [], [] with torch.no_grad(): for inputs, targets in val_loader: outputs = model(inputs) loss = criterion(outputs, targets) val_loss += loss.item() * inputs.size(0) _, preds = torch.max(outputs, 1) val_acc += torch.sum(preds == targets.data) all_preds.extend(preds.cpu().numpy()) all_targets.extend(targets.cpu().numpy()) val_loss /= len(val_loader.dataset) val_acc = val_acc.double().cpu() / len(val_loader.dataset) class_precisions_m = precision_score(all_targets, all_preds, average='macro') class_recalls_m = recall_score(all_targets, all_preds, average='macro') class_f1_scores_m = f1_score(all_targets, all_preds, average='macro') logging.info(f'Epoch {epoch+1:0{3}d} | Train Loss: {train_loss:.4f} | Train Accuracy: {train_acc:.4f} | Validation Loss: {val_loss:.4f} | Validation Accuracy: {val_acc:.4f} | Validation Mean Precision: {class_precisions_m:.4f} | Validation Mean Recall: {class_recalls_m:.4f} | Validation Mean F1_score: {class_f1_scores_m:.4f}') print(f'Epoch {epoch+1:0{3}d} | Train Loss: {train_loss:.4f} | Train Accuracy: {train_acc:.4f} | Validation Loss: {val_loss:.4f} | Validation Accuracy: {val_acc:.4f} | Validation Mean Precision: {class_precisions_m:.4f} | Validation Mean Recall: {class_recalls_m:.4f} | Validation Mean F1_score: {class_f1_scores_m:.4f}') train_loss_history.append(train_loss) train_acc_history.append(train_acc) val_loss_history.append(val_loss) val_acc_history.append(val_acc) val_f1_history.append(class_f1_scores_m) val_precision_history.append(class_precisions_m) val_recall_history.append(class_recalls_m) # 打印训练和验证过程的可视化图片 plt.close('all') fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(15, 5)) ax1.plot(train_loss_history, label='Train Loss') ax1.plot(val_loss_history, label='Validation Loss') ax1.set_title('Loss') ax1.legend() ax2.plot(train_acc_history, label='Train Accuracy') ax2.plot(val_acc_history, label='Validation Accuracy') ax2.set_title('Accuracy') ax2.legend() ax3.plot(val_f1_history, label='Validation F1') ax3.plot(val_precision_history, label='Validation Precision') ax3.plot(val_recall_history, label='Validation Recall') ax3.set_title('Precision Recall F1-Score (Macro Mean)') ax3.legend() # 保存图片 parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) train_process_path = os.path.join(parent_dir, config['train_process_path']) plt.savefig(train_process_path) if class_f1_scores_m > best_val_f1: best_val_f1 = class_f1_scores_m best_val_recall = class_recalls_m best_val_precision = class_precisions_m best_epoch = epoch best_model = model.state_dict() trigger_times = 0 else: trigger_times += 1 if trigger_times >= patience: logging.info(f'Early stopping at epoch {epoch} | Best epoch : {best_epoch + 1}') print(f'Early stopping at epoch {epoch} | Best epoch : {best_epoch + 1}') break return best_val_f1, best_val_recall, best_val_precision, best_epoch, best_model if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('--config', type=str, default='config.yaml', help='Path to the configuration file') args = parser.parse_args() with open(args.config, 'r') as f: config = yaml.load(f, Loader=yaml.FullLoader) # 配置日志 parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) log_path = os.path.join(parent_dir, config['log_path']) logging.basicConfig(filename=log_path, level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S') list_avg_f1 = [] list_wrong_percentage = [] list_precision = [] list_recall = [] list_f1 = [] train_times = 1 if config['data_train']==r'all' else config["experiments_count"] for i in range(train_times): avg_f1, wrong_percentage, precision, recall, f1 = train_detect(config) list_avg_f1.append(avg_f1) list_wrong_percentage.append(wrong_percentage) list_precision.append(precision) list_recall.append(recall) list_f1.append(f1) logging.info(f"Result: Avg F1: {sum(list_avg_f1) / len(list_avg_f1):.4f} Avg Wrong Percentage: {sum(list_wrong_percentage) / len(list_wrong_percentage):.2f}%") logging.info(f"Result: Avg Precision: {[sum(p[i] for p in list_precision) / len(list_precision) for i in range(len(list_precision[0]))]} | {np.mean(list_precision)}") logging.info(f"Result: Avg Recall: {[sum(r[i] for r in list_recall) / len(list_recall) for i in range(len(list_recall[0]))]} | {np.mean(list_recall)}") logging.info(f"Result: Avg F1: {[sum(f1[i] for f1 in list_f1) / len(list_f1) for i in range(len(list_f1[0]))]} | {np.mean(list_f1)}") print(f"Result: Avg F1: {sum(list_avg_f1) / len(list_avg_f1):.4f} Avg Wrong Percentage: {sum(list_wrong_percentage) / len(list_wrong_percentage):.2f}%") print(f"Result: Avg Precision: {[sum(p[i] for p in list_precision) / len(list_precision) for i in range(len(list_precision[0]))]} | {np.mean(list_precision)}") print(f"Result: Avg Recall: {[sum(r[i] for r in list_recall) / len(list_recall) for i in range(len(list_recall[0]))]} | {np.mean(list_recall)}") print(f"Result: Avg F1: {[sum(f1[i] for f1 in list_f1) / len(list_f1) for i in range(len(list_f1[0]))]} | {np.mean(list_f1)}")