|
|
import torch
|
|
|
import torch.nn as nn
|
|
|
import torch.optim as optim
|
|
|
import numpy as np
|
|
|
import pandas as pd
|
|
|
from sklearn.model_selection import StratifiedKFold
|
|
|
from torch.utils.data import DataLoader, Dataset
|
|
|
from torchsummary import summary
|
|
|
# from torchviz import make_dot # 需要apt安装graphviz工具,图不好看,先不用
|
|
|
import time
|
|
|
import random
|
|
|
|
|
|
|
|
|
# 定义 MLP 模型,每一层需要手写
|
|
|
class MLP(nn.Module):
|
|
|
def __init__(self, input_size, hidden_size, output_size):
|
|
|
super(MLP, self).__init__()
|
|
|
self.fc1 = nn.Linear(input_size, hidden_size)
|
|
|
self.relu = nn.ReLU()
|
|
|
self.fc2 = nn.Linear(hidden_size, output_size)
|
|
|
self.sigmoid = nn.Sigmoid()
|
|
|
|
|
|
def forward(self, x):
|
|
|
out = self.fc1(x)
|
|
|
out = self.relu(out)
|
|
|
out = self.fc2(out)
|
|
|
out = self.sigmoid(out)
|
|
|
return out
|
|
|
|
|
|
# 定义 MMLP 模型,隐藏层根据输入数组自动生成全连接,并在每一层后自动加入ReLU激活函数
|
|
|
class MMLP(nn.Module):
|
|
|
def __init__(self, input_size, hidden_sizes, output_size):
|
|
|
super(MMLP, self).__init__()
|
|
|
self.layers = nn.ModuleList()
|
|
|
for h in hidden_sizes:
|
|
|
self.layers.append(nn.Linear(input_size, h))
|
|
|
self.layers.append(nn.ReLU())
|
|
|
input_size = h
|
|
|
self.layers.append(nn.Linear(input_size, output_size))
|
|
|
# 最后一层加入激活函数后,会严重影响收敛情况,原因待分析,softmax不需要加,因为nn.CrossEntropyLoss()损失函数自带softmax运算,加了会连续两次指数运算,容易溢出;至于Sigmoid,原因是输入数据要做(0,1)的归一化
|
|
|
#self.layers.append(nn.Sigmoid())
|
|
|
#self.layers.append(nn.Softmax(dim=1))
|
|
|
#self.layers.append(nn.LogSoftmax(dim=1))
|
|
|
|
|
|
def forward(self, x):
|
|
|
for layer in self.layers:
|
|
|
x = layer(x)
|
|
|
return x
|
|
|
|
|
|
# 定义数据集
|
|
|
class TensorDataset(Dataset):
|
|
|
def __init__(self, features, labels):
|
|
|
self.features = features
|
|
|
self.labels = labels
|
|
|
|
|
|
def __len__(self):
|
|
|
return len(self.features)
|
|
|
|
|
|
def __getitem__(self, index):
|
|
|
return self.features[index], self.labels[index]
|
|
|
|
|
|
# 定义训练函数
|
|
|
def train_model(model, train_loader, criterion, optimizer, num_epochs):
|
|
|
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
|
|
|
model.to(device)
|
|
|
model.train()
|
|
|
|
|
|
for epoch in range(num_epochs):
|
|
|
start_time = time.time()
|
|
|
train_loss = 0.0
|
|
|
train_corrects = 0
|
|
|
|
|
|
for inputs, labels in train_loader:
|
|
|
inputs = inputs.to(device)
|
|
|
labels = labels.to(device)
|
|
|
|
|
|
# # 由于出现了奇怪的情况,用于验证我的数据到这里开始训练为止是否是正确的
|
|
|
# for i, input in enumerate(inputs):
|
|
|
# ii = -1
|
|
|
# for score in input:
|
|
|
# if score >=3.0:
|
|
|
# ii = 1
|
|
|
# break
|
|
|
# else:
|
|
|
# ii = 0
|
|
|
# assert(labels[i] == ii), f"{inputs} # {labels} # 第{i}个出现问题"
|
|
|
# ## 由于出现了奇怪的情况,用于验证我的数据到这里开始训练为止是否是正确的
|
|
|
|
|
|
optimizer.zero_grad()
|
|
|
outputs = model(inputs)
|
|
|
_, preds = torch.max(outputs, 1)
|
|
|
#print(outputs)
|
|
|
# print("$$$$$$$$$")
|
|
|
#print(preds)
|
|
|
#print(inputs)
|
|
|
#assert(torch.sum(preds)==0)
|
|
|
# print("####")
|
|
|
#print(labels)
|
|
|
# print("$$$$$$$$$")
|
|
|
#loss = criterion(outputs, labels.unsqueeze(1))
|
|
|
#loss = criterion(outputs, torch.tensor(labels, dtype=torch.long))
|
|
|
loss = criterion(outputs, labels.long())
|
|
|
#print(loss)
|
|
|
#print(inputs.size())
|
|
|
loss.backward()
|
|
|
optimizer.step()
|
|
|
|
|
|
train_loss += loss.item() * inputs.size(0)
|
|
|
train_corrects += torch.sum(preds == labels.data)
|
|
|
|
|
|
train_loss = train_loss / len(train_loader.dataset)
|
|
|
train_acc = train_corrects.double() / len(train_loader.dataset)
|
|
|
|
|
|
print('Epoch [{}/{}], Loss: {:.4f}, Acc: {:.4f}, took time: {:.2f}s'
|
|
|
.format(epoch+1, num_epochs, train_loss, train_acc, time.time() - start_time))
|
|
|
|
|
|
# 定义测试函数
|
|
|
def test(model, dataloader, criterion):
|
|
|
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
|
|
|
model.eval()
|
|
|
running_loss = 0.0
|
|
|
running_corrects = 0
|
|
|
|
|
|
# 测试的写法相比于训练,主要是model.eval()和torch.no_grad(),用于屏蔽测试阶段梯度计算
|
|
|
# torch.no_gard拥有上下文管理属性,该代码块中,所有操作不跟踪梯度,减少内存和时间,训练阶段需要梯度计算,故不能使用;model.eval可以禁用BatchNorm层和Dropout层使用,以免在推断时造成不一致的结果,也可以减少内存和时间,如果训练模式下使用eval,可能会影响训练的正确性(不仅仅是内存和时间)
|
|
|
with torch.no_grad():
|
|
|
for inputs, labels in dataloader:
|
|
|
inputs = inputs.to(device)
|
|
|
labels = labels.to(device)
|
|
|
|
|
|
outputs = model(inputs)
|
|
|
#loss = criterion(outputs, labels.unsqueeze(1))
|
|
|
loss = criterion(outputs, labels.long())
|
|
|
|
|
|
_, preds = torch.max(outputs, 1)
|
|
|
|
|
|
running_loss += loss.item() * inputs.size(0)
|
|
|
running_corrects += torch.sum(preds == labels.data)
|
|
|
|
|
|
test_loss = running_loss / len(dataloader.dataset)
|
|
|
test_acc = running_corrects.double() / len(dataloader.dataset)
|
|
|
|
|
|
print('Test Loss: {:.4f} Acc: {:.4f}'.format(test_loss, test_acc))
|
|
|
return test_loss, test_acc
|
|
|
|
|
|
# 加载数据
|
|
|
df = pd.read_excel("data/data_src.xlsx")
|
|
|
src_features = df.iloc[:, 34:44].values.astype(np.float32)
|
|
|
src_labels = np.array([1 if str=="是" else 0 for str in df.iloc[:, -1].values]).astype(np.float32)
|
|
|
print("数据样本总量:", src_features.shape[0])
|
|
|
print("数据特征维度:", src_features.shape[1])
|
|
|
print("数据类别数量:", len(set(src_labels)))
|
|
|
|
|
|
# 定义交叉验证折数
|
|
|
n_splits = 5
|
|
|
skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random.randint(0, 1000))
|
|
|
|
|
|
# 定义模型参数
|
|
|
input_size = src_features.shape[1]
|
|
|
hidden_sizes = [32,128,32]
|
|
|
output_size = len(set(src_labels)) # CrossEntropyLoss()损失函数类别数量就是输出size,sigmoid损失只适用于二分类,size为1,这里统一使用Cross
|
|
|
lr = 0.001 # learn rate 学习率
|
|
|
num_epochs = 100
|
|
|
batch_size = 128
|
|
|
|
|
|
# 定义全局结果变量
|
|
|
fold_accuracy=[]
|
|
|
fold_loss=[]
|
|
|
model_name=0
|
|
|
|
|
|
# 遍历每个 fold
|
|
|
for fold, (train_idx, test_idx) in enumerate(skf.split(src_features, src_labels)):
|
|
|
print(f"Fold [{fold+1}/{skf.n_splits}]")
|
|
|
print("train_idx:", train_idx)
|
|
|
print("test_idx:", test_idx)
|
|
|
|
|
|
# 数据切片
|
|
|
train_features = src_features[train_idx]
|
|
|
train_labels = src_labels[train_idx]
|
|
|
test_features = src_features[test_idx]
|
|
|
test_labels = src_labels[test_idx]
|
|
|
|
|
|
# 将numpy数组转为PyTorch张量,这里统一使用float型,如果是Cross损失函数,这里的labels可以直接使用long整型,省去后面再转
|
|
|
train_features_tensor = torch.tensor(train_features, dtype=torch.float)
|
|
|
train_labels_tensor = torch.tensor(train_labels, dtype=torch.float)
|
|
|
test_features_tensor = torch.tensor(test_features, dtype=torch.float)
|
|
|
test_labels_tensor = torch.tensor(test_labels, dtype=torch.float)
|
|
|
|
|
|
# 构建数据集和数据加载器,batch_size大小前面定义好
|
|
|
train_dataset = TensorDataset(train_features_tensor, train_labels_tensor)
|
|
|
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
|
|
|
test_dataset = TensorDataset(test_features_tensor, test_labels_tensor)
|
|
|
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
|
|
|
|
|
|
print('--------------------------------')
|
|
|
|
|
|
# 初始化 MLP 模型
|
|
|
#model = MLP(input_size, 32, 2) # 手动写每一层,暂时不用
|
|
|
model = MMLP(input_size, hidden_sizes, output_size) # hidden_sizes是一个数组,多层
|
|
|
|
|
|
# 查看模型网络结构
|
|
|
# print(model)
|
|
|
summary(model.to(torch.device("cuda:0")), (input_size,))
|
|
|
|
|
|
# 定义损失函数和优化器,不能放在KFold之前定义,保证KFold验证的独立性
|
|
|
criterion = nn.CrossEntropyLoss()
|
|
|
#criterion = nn.BCELoss() # 使用sigmoid损失函数再使用
|
|
|
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
|
|
|
#optimizer = torch.optim.SGD(model.parameters(), lr=lr) # 这个优化器只有lr一个超参数,适用于小规模网络结构
|
|
|
|
|
|
# 训练 MLP 模型
|
|
|
train_model(model, train_loader, criterion, optimizer, num_epochs)
|
|
|
|
|
|
# 测试 MLP 模型
|
|
|
test_loss, test_acc = test(model, test_loader, criterion)
|
|
|
|
|
|
fold_accuracy.append(test_acc.item())
|
|
|
fold_loss.append(test_loss)
|
|
|
print(f'Accuracy for fold {fold}: {fold_accuracy[fold]*100} %, loss: {fold_loss[fold]}')
|
|
|
print('--------------------------------')
|
|
|
# Save trained model
|
|
|
torch.save(model.state_dict(), 'psychology_model_val_{}.pth'.format(str(model_name)))
|
|
|
model_name+=1
|
|
|
|
|
|
print('K-FOLD CROSS VALIDATION RESULTS')
|
|
|
print(f'Fold accuracies: {fold_accuracy}')
|
|
|
print(f'Mean accuracy: {np.mean(fold_accuracy)}')
|
|
|
print(f'Mean loss: {np.mean(fold_loss)}')
|
|
|
|