You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

230 lines
9.2 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
from sklearn.model_selection import StratifiedKFold
from torch.utils.data import DataLoader, Dataset
from torchsummary import summary
# from torchviz import make_dot # 需要apt安装graphviz工具图不好看先不用
import time
import random
# 定义 MLP 模型,每一层需要手写
class MLP(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(MLP, self).__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(hidden_size, output_size)
self.sigmoid = nn.Sigmoid()
def forward(self, x):
out = self.fc1(x)
out = self.relu(out)
out = self.fc2(out)
out = self.sigmoid(out)
return out
# 定义 MMLP 模型隐藏层根据输入数组自动生成全连接并在每一层后自动加入ReLU激活函数
class MMLP(nn.Module):
def __init__(self, input_size, hidden_sizes, output_size):
super(MMLP, self).__init__()
self.layers = nn.ModuleList()
for h in hidden_sizes:
self.layers.append(nn.Linear(input_size, h))
self.layers.append(nn.ReLU())
input_size = h
self.layers.append(nn.Linear(input_size, output_size))
# 最后一层加入激活函数后,会严重影响收敛情况,原因待分析,softmax不需要加因为nn.CrossEntropyLoss()损失函数自带softmax运算加了会连续两次指数运算容易溢出至于Sigmoid原因是输入数据要做0,1的归一化
#self.layers.append(nn.Sigmoid())
#self.layers.append(nn.Softmax(dim=1))
#self.layers.append(nn.LogSoftmax(dim=1))
def forward(self, x):
for layer in self.layers:
x = layer(x)
return x
# 定义数据集
class TensorDataset(Dataset):
def __init__(self, features, labels):
self.features = features
self.labels = labels
def __len__(self):
return len(self.features)
def __getitem__(self, index):
return self.features[index], self.labels[index]
# 定义训练函数
def train_model(model, train_loader, criterion, optimizer, num_epochs):
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)
model.train()
for epoch in range(num_epochs):
start_time = time.time()
train_loss = 0.0
train_corrects = 0
for inputs, labels in train_loader:
inputs = inputs.to(device)
labels = labels.to(device)
# # 由于出现了奇怪的情况,用于验证我的数据到这里开始训练为止是否是正确的
# for i, input in enumerate(inputs):
# ii = -1
# for score in input:
# if score >=3.0:
# ii = 1
# break
# else:
# ii = 0
# assert(labels[i] == ii), f"{inputs} # {labels} # 第{i}个出现问题"
# ## 由于出现了奇怪的情况,用于验证我的数据到这里开始训练为止是否是正确的
optimizer.zero_grad()
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
#print(outputs)
# print("$$$$$$$$$")
#print(preds)
#print(inputs)
#assert(torch.sum(preds)==0)
# print("####")
#print(labels)
# print("$$$$$$$$$")
#loss = criterion(outputs, labels.unsqueeze(1))
#loss = criterion(outputs, torch.tensor(labels, dtype=torch.long))
loss = criterion(outputs, labels.long())
#print(loss)
#print(inputs.size())
loss.backward()
optimizer.step()
train_loss += loss.item() * inputs.size(0)
train_corrects += torch.sum(preds == labels.data)
train_loss = train_loss / len(train_loader.dataset)
train_acc = train_corrects.double() / len(train_loader.dataset)
print('Epoch [{}/{}], Loss: {:.4f}, Acc: {:.4f}, took time: {:.2f}s'
.format(epoch+1, num_epochs, train_loss, train_acc, time.time() - start_time))
# 定义测试函数
def test(model, dataloader, criterion):
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.eval()
running_loss = 0.0
running_corrects = 0
# 测试的写法相比于训练主要是model.eval()和torch.no_grad(),用于屏蔽测试阶段梯度计算
# torch.no_gard拥有上下文管理属性该代码块中所有操作不跟踪梯度减少内存和时间训练阶段需要梯度计算故不能使用model.eval可以禁用BatchNorm层和Dropout层使用以免在推断时造成不一致的结果也可以减少内存和时间如果训练模式下使用eval可能会影响训练的正确性不仅仅是内存和时间
with torch.no_grad():
for inputs, labels in dataloader:
inputs = inputs.to(device)
labels = labels.to(device)
outputs = model(inputs)
#loss = criterion(outputs, labels.unsqueeze(1))
loss = criterion(outputs, labels.long())
_, preds = torch.max(outputs, 1)
running_loss += loss.item() * inputs.size(0)
running_corrects += torch.sum(preds == labels.data)
test_loss = running_loss / len(dataloader.dataset)
test_acc = running_corrects.double() / len(dataloader.dataset)
print('Test Loss: {:.4f} Acc: {:.4f}'.format(test_loss, test_acc))
return test_loss, test_acc
# 加载数据
df = pd.read_excel("data/data_src.xlsx")
src_features = df.iloc[:, 34:44].values.astype(np.float32)
src_labels = np.array([1 if str=="" else 0 for str in df.iloc[:, -1].values]).astype(np.float32)
print("数据样本总量:", src_features.shape[0])
print("数据特征维度:", src_features.shape[1])
print("数据类别数量:", len(set(src_labels)))
# 定义交叉验证折数
n_splits = 5
skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random.randint(0, 1000))
# 定义模型参数
input_size = src_features.shape[1]
hidden_sizes = [32,128,32]
output_size = len(set(src_labels)) # CrossEntropyLoss()损失函数类别数量就是输出sizesigmoid损失只适用于二分类size为1这里统一使用Cross
lr = 0.001 # learn rate 学习率
num_epochs = 100
batch_size = 128
# 定义全局结果变量
fold_accuracy=[]
fold_loss=[]
model_name=0
# 遍历每个 fold
for fold, (train_idx, test_idx) in enumerate(skf.split(src_features, src_labels)):
print(f"Fold [{fold+1}/{skf.n_splits}]")
print("train_idx:", train_idx)
print("test_idx:", test_idx)
# 数据切片
train_features = src_features[train_idx]
train_labels = src_labels[train_idx]
test_features = src_features[test_idx]
test_labels = src_labels[test_idx]
# 将numpy数组转为PyTorch张量这里统一使用float型如果是Cross损失函数这里的labels可以直接使用long整型省去后面再转
train_features_tensor = torch.tensor(train_features, dtype=torch.float)
train_labels_tensor = torch.tensor(train_labels, dtype=torch.float)
test_features_tensor = torch.tensor(test_features, dtype=torch.float)
test_labels_tensor = torch.tensor(test_labels, dtype=torch.float)
# 构建数据集和数据加载器batch_size大小前面定义好
train_dataset = TensorDataset(train_features_tensor, train_labels_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataset = TensorDataset(test_features_tensor, test_labels_tensor)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
print('--------------------------------')
# 初始化 MLP 模型
#model = MLP(input_size, 32, 2) # 手动写每一层,暂时不用
model = MMLP(input_size, hidden_sizes, output_size) # hidden_sizes是一个数组多层
# 查看模型网络结构
# print(model)
summary(model.to(torch.device("cuda:0")), (input_size,))
# 定义损失函数和优化器不能放在KFold之前定义保证KFold验证的独立性
criterion = nn.CrossEntropyLoss()
#criterion = nn.BCELoss() # 使用sigmoid损失函数再使用
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
#optimizer = torch.optim.SGD(model.parameters(), lr=lr) # 这个优化器只有lr一个超参数适用于小规模网络结构
# 训练 MLP 模型
train_model(model, train_loader, criterion, optimizer, num_epochs)
# 测试 MLP 模型
test_loss, test_acc = test(model, test_loader, criterion)
fold_accuracy.append(test_acc.item())
fold_loss.append(test_loss)
print(f'Accuracy for fold {fold}: {fold_accuracy[fold]*100} %, loss: {fold_loss[fold]}')
print('--------------------------------')
# Save trained model
torch.save(model.state_dict(), 'psychology_model_val_{}.pth'.format(str(model_name)))
model_name+=1
print('K-FOLD CROSS VALIDATION RESULTS')
print(f'Fold accuracies: {fold_accuracy}')
print(f'Mean accuracy: {np.mean(fold_accuracy)}')
print(f'Mean loss: {np.mean(fold_loss)}')