import torch import torch.nn as nn import torch.optim as optim import numpy as np import pandas as pd from sklearn.model_selection import StratifiedKFold from torch.utils.data import DataLoader, Dataset from torchsummary import summary # from torchviz import make_dot # 需要apt安装graphviz工具,图不好看,先不用 import time import random # 定义 MLP 模型,每一层需要手写 class MLP(nn.Module): def __init__(self, input_size, hidden_size, output_size): super(MLP, self).__init__() self.fc1 = nn.Linear(input_size, hidden_size) self.relu = nn.ReLU() self.fc2 = nn.Linear(hidden_size, output_size) self.sigmoid = nn.Sigmoid() def forward(self, x): out = self.fc1(x) out = self.relu(out) out = self.fc2(out) out = self.sigmoid(out) return out # 定义 MMLP 模型,隐藏层根据输入数组自动生成全连接,并在每一层后自动加入ReLU激活函数 class MMLP(nn.Module): def __init__(self, input_size, hidden_sizes, output_size): super(MMLP, self).__init__() self.layers = nn.ModuleList() for h in hidden_sizes: self.layers.append(nn.Linear(input_size, h)) self.layers.append(nn.ReLU()) input_size = h self.layers.append(nn.Linear(input_size, output_size)) # 最后一层加入激活函数后,会严重影响收敛情况,原因待分析,softmax不需要加,因为nn.CrossEntropyLoss()损失函数自带softmax运算,加了会连续两次指数运算,容易溢出;至于Sigmoid,原因是输入数据要做(0,1)的归一化 #self.layers.append(nn.Sigmoid()) #self.layers.append(nn.Softmax(dim=1)) #self.layers.append(nn.LogSoftmax(dim=1)) def forward(self, x): for layer in self.layers: x = layer(x) return x # 定义数据集 class TensorDataset(Dataset): def __init__(self, features, labels): self.features = features self.labels = labels def __len__(self): return len(self.features) def __getitem__(self, index): return self.features[index], self.labels[index] # 定义训练函数 def train_model(model, train_loader, criterion, optimizer, num_epochs): device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") model.to(device) model.train() for epoch in range(num_epochs): start_time = time.time() train_loss = 0.0 train_corrects = 0 for inputs, labels in train_loader: inputs = inputs.to(device) labels = labels.to(device) # # 由于出现了奇怪的情况,用于验证我的数据到这里开始训练为止是否是正确的 # for i, input in enumerate(inputs): # ii = -1 # for score in input: # if score >=3.0: # ii = 1 # break # else: # ii = 0 # assert(labels[i] == ii), f"{inputs} # {labels} # 第{i}个出现问题" # ## 由于出现了奇怪的情况,用于验证我的数据到这里开始训练为止是否是正确的 optimizer.zero_grad() outputs = model(inputs) _, preds = torch.max(outputs, 1) #print(outputs) # print("$$$$$$$$$") #print(preds) #print(inputs) #assert(torch.sum(preds)==0) # print("####") #print(labels) # print("$$$$$$$$$") #loss = criterion(outputs, labels.unsqueeze(1)) #loss = criterion(outputs, torch.tensor(labels, dtype=torch.long)) loss = criterion(outputs, labels.long()) #print(loss) #print(inputs.size()) loss.backward() optimizer.step() train_loss += loss.item() * inputs.size(0) train_corrects += torch.sum(preds == labels.data) train_loss = train_loss / len(train_loader.dataset) train_acc = train_corrects.double() / len(train_loader.dataset) print('Epoch [{}/{}], Loss: {:.4f}, Acc: {:.4f}, took time: {:.2f}s' .format(epoch+1, num_epochs, train_loss, train_acc, time.time() - start_time)) # 定义测试函数 def test(model, dataloader, criterion): device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") model.eval() running_loss = 0.0 running_corrects = 0 # 测试的写法相比于训练,主要是model.eval()和torch.no_grad(),用于屏蔽测试阶段梯度计算 # torch.no_gard拥有上下文管理属性,该代码块中,所有操作不跟踪梯度,减少内存和时间,训练阶段需要梯度计算,故不能使用;model.eval可以禁用BatchNorm层和Dropout层使用,以免在推断时造成不一致的结果,也可以减少内存和时间,如果训练模式下使用eval,可能会影响训练的正确性(不仅仅是内存和时间) with torch.no_grad(): for inputs, labels in dataloader: inputs = inputs.to(device) labels = labels.to(device) outputs = model(inputs) #loss = criterion(outputs, labels.unsqueeze(1)) loss = criterion(outputs, labels.long()) _, preds = torch.max(outputs, 1) running_loss += loss.item() * inputs.size(0) running_corrects += torch.sum(preds == labels.data) test_loss = running_loss / len(dataloader.dataset) test_acc = running_corrects.double() / len(dataloader.dataset) print('Test Loss: {:.4f} Acc: {:.4f}'.format(test_loss, test_acc)) return test_loss, test_acc # 加载数据 df = pd.read_excel("data/data_src.xlsx") src_features = df.iloc[:, 34:44].values.astype(np.float32) src_labels = np.array([1 if str=="是" else 0 for str in df.iloc[:, -1].values]).astype(np.float32) print("数据样本总量:", src_features.shape[0]) print("数据特征维度:", src_features.shape[1]) print("数据类别数量:", len(set(src_labels))) # 定义交叉验证折数 n_splits = 5 skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random.randint(0, 1000)) # 定义模型参数 input_size = src_features.shape[1] hidden_sizes = [32,128,32] output_size = len(set(src_labels)) # CrossEntropyLoss()损失函数类别数量就是输出size,sigmoid损失只适用于二分类,size为1,这里统一使用Cross lr = 0.001 # learn rate 学习率 num_epochs = 100 batch_size = 128 # 定义全局结果变量 fold_accuracy=[] fold_loss=[] model_name=0 # 遍历每个 fold for fold, (train_idx, test_idx) in enumerate(skf.split(src_features, src_labels)): print(f"Fold [{fold+1}/{skf.n_splits}]") print("train_idx:", train_idx) print("test_idx:", test_idx) # 数据切片 train_features = src_features[train_idx] train_labels = src_labels[train_idx] test_features = src_features[test_idx] test_labels = src_labels[test_idx] # 将numpy数组转为PyTorch张量,这里统一使用float型,如果是Cross损失函数,这里的labels可以直接使用long整型,省去后面再转 train_features_tensor = torch.tensor(train_features, dtype=torch.float) train_labels_tensor = torch.tensor(train_labels, dtype=torch.float) test_features_tensor = torch.tensor(test_features, dtype=torch.float) test_labels_tensor = torch.tensor(test_labels, dtype=torch.float) # 构建数据集和数据加载器,batch_size大小前面定义好 train_dataset = TensorDataset(train_features_tensor, train_labels_tensor) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) test_dataset = TensorDataset(test_features_tensor, test_labels_tensor) test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) print('--------------------------------') # 初始化 MLP 模型 #model = MLP(input_size, 32, 2) # 手动写每一层,暂时不用 model = MMLP(input_size, hidden_sizes, output_size) # hidden_sizes是一个数组,多层 # 查看模型网络结构 # print(model) summary(model.to(torch.device("cuda:0")), (input_size,)) # 定义损失函数和优化器,不能放在KFold之前定义,保证KFold验证的独立性 criterion = nn.CrossEntropyLoss() #criterion = nn.BCELoss() # 使用sigmoid损失函数再使用 optimizer = torch.optim.Adam(model.parameters(), lr=lr) #optimizer = torch.optim.SGD(model.parameters(), lr=lr) # 这个优化器只有lr一个超参数,适用于小规模网络结构 # 训练 MLP 模型 train_model(model, train_loader, criterion, optimizer, num_epochs) # 测试 MLP 模型 test_loss, test_acc = test(model, test_loader, criterion) fold_accuracy.append(test_acc.item()) fold_loss.append(test_loss) print(f'Accuracy for fold {fold}: {fold_accuracy[fold]*100} %, loss: {fold_loss[fold]}') print('--------------------------------') # Save trained model torch.save(model.state_dict(), 'psychology_model_val_{}.pth'.format(str(model_name))) model_name+=1 print('K-FOLD CROSS VALIDATION RESULTS') print(f'Fold accuracies: {fold_accuracy}') print(f'Mean accuracy: {np.mean(fold_accuracy)}') print(f'Mean loss: {np.mean(fold_loss)}')