You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

228 lines
7.0 KiB
Python

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
from sklearn.model_selection import StratifiedKFold
from torch.utils.data import DataLoader, Dataset
# 定义 MLP 模型
class MLP(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(MLP, self).__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(hidden_size, output_size)
self.sigmoid = nn.Sigmoid()
def forward(self, x):
out = self.fc1(x)
out = self.relu(out)
out = self.fc2(out)
out = self.sigmoid(out)
return out
class MMLP(nn.Module):
def __init__(self, input_size, hidden_sizes, output_size):
super(MMLP, self).__init__()
self.layers = nn.ModuleList()
for h in hidden_sizes:
self.layers.append(nn.Linear(input_size, h))
self.layers.append(nn.ReLU())
input_size = h
self.layers.append(nn.Linear(input_size, output_size))
self.layers.append(nn.Sigmoid())
#self.layers.append(nn.Softmax(dim=1))
def forward(self, x):
for layer in self.layers:
x = layer(x)
return x
# 定义数据集
class TensorDataset(Dataset):
def __init__(self, features, labels):
self.features = features
self.labels = labels
def __len__(self):
return len(self.features)
def __getitem__(self, index):
return self.features[index], self.labels[index]
# 定义训练函数
def train_model(model, train_loader, criterion, optimizer, num_epochs):
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)
model.train()
for epoch in range(num_epochs):
train_loss = 0.0
train_corrects = 0
for inputs, labels in train_loader:
inputs = inputs.to(device)
labels = labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
loss = criterion(outputs, labels.unsqueeze(1))
loss.backward()
optimizer.step()
train_loss += loss.item() * inputs.size(0)
train_corrects += torch.sum(preds == labels.data)
train_loss = train_loss / len(train_loader.dataset)
train_acc = train_corrects.double() / len(train_loader.dataset)
print('Epoch [{}/{}], Loss: {:.4f}, Acc: {:.4f}'
.format(epoch+1, num_epochs, train_loss, train_acc))
# 定义测试函数
def test(model, dataloader, criterion):
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.eval()
running_loss = 0.0
running_corrects = 0
with torch.no_grad():
for inputs, labels in dataloader:
inputs = inputs.to(device)
labels = labels.to(device)
outputs = model(inputs)
loss = criterion(outputs, labels.unsqueeze(1))
_, preds = torch.max(outputs, 1)
running_loss += loss.item() * inputs.size(0)
running_corrects += torch.sum(preds == labels.data)
test_loss = running_loss / len(dataloader.dataset)
test_acc = running_corrects.double() / len(dataloader.dataset)
print('Test Loss: {:.4f} Acc: {:.4f}'.format(test_loss, test_acc))
return test_loss, test_acc
# 加载数据
df = pd.read_excel("data/data_src.xlsx")
src_features = df.iloc[:, 36:43].values.astype(np.float32)
src_labels = np.array([1 if str=="" else 0 for str in df.iloc[:, -1].values]).astype(np.float32)
print(src_labels)
print(len(src_labels))
# 检查是否有可用的GPU设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)
# 定义交叉验证折数
n_splits = 5
skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
# 定义模型参数
input_size = src_features.shape[1]
print(input_size)
hidden_size = 32
output_size = 1
lr = 0.01
num_epochs = 50
batch_size = 32
# 定义损失函数和优化器
# criterion = nn.BCELoss()
# optimizer = optim.Adam(model.parameters(), lr=lr)
# 进行交叉验证训练和测试
# for fold, (train_idx, val_idx) in enumerate(skf.split(features, labels)):
# print(f"Fold {fold+1}:")
# # 将数据集分为训练集和验证集
# 进行交叉验证训练和测试
k_folds = 5
#num_epochs = 50
batch_size = 16
fold_accuracy=[]
# 总数26111
# 遍历每个 fold
for fold, (train_idx, test_idx) in enumerate(skf.split(src_features, src_labels)):
print(f"Fold [{fold+1}/{skf.n_splits}]")
print("train_idx:", train_idx)
print("test_idx:", test_idx)
train_features = src_features[train_idx]
train_labels = src_labels[train_idx]
test_features = src_features[test_idx]
test_labels = src_labels[test_idx]
# 将numpy数组转为PyTorch张量
train_features_tensor = torch.tensor(train_features, dtype=torch.float)
train_labels_tensor = torch.tensor(train_labels, dtype=torch.float)
test_features_tensor = torch.tensor(test_features, dtype=torch.float)
test_labels_tensor = torch.tensor(test_labels, dtype=torch.float)
# 构建数据集和数据加载器
train_dataset = TensorDataset(train_features_tensor, train_labels_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_dataset = TensorDataset(test_features_tensor, test_labels_tensor)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
print('--------------------------------')
# 初始化 MLP 模型
#model = MLP(input_size, 32, 2)
model = MMLP(input_size, [32], 1)
# 定义损失函数和优化器
#criterion = nn.CrossEntropyLoss()
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
# 训练 MLP 模型
train_model(model, train_loader, criterion, optimizer, num_epochs)
model.train()
# for epoch in range(num_epochs):
# for i, (inputs, labels) in enumerate(train_loader):
# # 前向传播
# #print("inputs size: ", inputs.size())
# outputs = model(inputs)
# loss = criterion(outputs, labels)
# # 反向传播和优化
# optimizer.zero_grad()
# loss.backward()
# optimizer.step()
# 测试 MLP 模型
test_loss, test_acc = test(model, test_loader, criterion)
# correct = 0
# total = 0
# model.eval()
# with torch.no_grad():
# for inputs, labels in test_loader:
# outputs = model(inputs)
# _, predicted = torch.max(outputs.data, 1)
# total += labels.size(0)
# correct += (predicted == labels).sum().item()
fold_accuracy.append(test_acc.item())
print(f'Accuracy for fold {fold}: {fold_accuracy[fold]*100} %')
print('--------------------------------')
print('K-FOLD CROSS VALIDATION RESULTS')
print(f'Fold accuracies: {fold_accuracy}')
print(f'Mean accuracy: {np.mean(fold_accuracy)}')