import os import time import datetime import logging import uvicorn import yaml import numpy as np from fastapi import FastAPI, Request from pydantic import BaseModel from typing import List import atexit from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from utils.feature_process import create_feature_df, apply_feature_weights, Features from utils.common import MLModel app = FastAPI() # 控制是否打印的宏定义 PRINT_LOG = True def log_print(message): logging.info(message) if PRINT_LOG: print(message) # 保证日志写到文件 def flush_log(): for handler in logging.getLogger().handlers: handler.flush() # 定义fastapi返回类 inference class PredictionResult(BaseModel): predictions: list # 定义fastapi返回类 class ClassificationResult(BaseModel): precision: list recall: list f1: list wrong_percentage: float # 允许所有域名的跨域请求 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], allow_headers=["*"], ) # 初始化配置文件 config_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "config/config.yaml")) # 定义训练接口 @app.post("/train/") async def train_model(request: Request, features_list: List[Features]): # 遍历每个特征对象,并将其添加到 all_features 中 all_features = create_feature_df(features_list) # 读取 YAML 配置文件 with open(config_path, 'r') as f: config = yaml.load(f, Loader=yaml.FullLoader) feature_names = config['feature_names'] feature_weights = config['feature_weights'] # 应用特征权重 feature_label_weighted = apply_feature_weights(all_features, feature_names, feature_weights) start_time = time.time() # 记录开始时间 # 创建静态文件存放文件夹 static_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "train_api")) # 设置模型文件和配置文件的存放目录,和本py同级 os.makedirs(static_dir, exist_ok=True) # 训练前设置 now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") data_path = os.path.abspath(os.path.join(static_dir, f"train_feature_label_weighted_{now}.xlsx")) config['data_path'] = data_path feature_label_weighted.to_excel(data_path, index=False) # 添加模型保存路径 model_path = os.path.abspath(os.path.join(static_dir, f"train_model_{now}.pth")) config['model_path'] = model_path # 配置日志 log_path = os.path.abspath(os.path.join(static_dir, f"train_log_{now}.log")) logging.basicConfig(filename=log_path, level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S') # 配置训练和验证结果图片路径 train_process_path = os.path.abspath(os.path.join(static_dir, f"train_progress_img_{now}.png")) config['train_process_path'] = train_process_path evaluate_result_path = os.path.abspath(os.path.join(static_dir, f"evaluate_result_img_{now}.png")) config['evaluate_result_path'] = evaluate_result_path log_print("config: " + str(config)) # 开始训练 # 初始化 MLModel 实例 ml_model = MLModel(config) list_avg_f1 = [] list_wrong_percentage = [] list_precision = [] list_recall = [] list_f1 = [] train_times = 1 if config['data_train'] == 'all' else config["experiments_count"] for _ in range(train_times): avg_f1, wrong_percentage, precision, recall, f1 = ml_model.train_detect() list_avg_f1.append(avg_f1) list_wrong_percentage.append(wrong_percentage) list_precision.append(precision) list_recall.append(recall) list_f1.append(f1) log_print(f"Result: Avg F1: {sum(list_avg_f1) / len(list_avg_f1):.4f} Avg Wrong Percentage: {sum(list_wrong_percentage) / len(list_wrong_percentage):.2f}%") log_print(f"Result: Avg Precision: {[sum(p[i] for p in list_precision) / len(list_precision) for i in range(len(list_precision[0]))]} | {np.mean(list_precision)}") log_print(f"Result: Avg Recall: {[sum(r[i] for r in list_recall) / len(list_recall) for i in range(len(list_recall[0]))]} | {np.mean(list_recall)}") log_print(f"Result: Avg F1: {[sum(f1[i] for f1 in list_f1) / len(list_f1) for i in range(len(list_f1[0]))]} | {np.mean(list_f1)}") end_time = time.time() # 记录结束时间 log_print("预测耗时: " + str(end_time - start_time) + " 秒") # 打印执行时间 # 保证日志写到文件 atexit.register(flush_log) # 返回分类结果和模型文件下载 URL,static不是程序执行路径,而是app.mount的静态文件夹 model_file_url = f"{request.base_url}train_api/train_model_{now}.pth" log_file_url = f"{request.base_url}train_api/train_log_{now}.log" data_file_url = f"{request.base_url}train_api/train_feature_label_weighted_{now}.xlsx" # 返回分类结果和模型文件 return { "classification_result": ClassificationResult( precision=precision, recall=recall, f1=f1, wrong_percentage=wrong_percentage ), "data_file": { "model_file_url": model_file_url, "log_file_url": log_file_url, "data_file_url": data_file_url } } # 定义验证接口 @app.post("/evaluate/") async def evaluate_model(request: Request, features_list: List[Features]): # 遍历每个特征对象,并将其添加到 all_features 中 all_features = create_feature_df(features_list) # 读取 YAML 配置文件 with open(config_path, 'r') as f: config = yaml.load(f, Loader=yaml.FullLoader) feature_names = config['feature_names'] feature_weights = config['feature_weights'] # 应用特征权重 feature_label_weighted = apply_feature_weights(all_features, feature_names, feature_weights) start_time = time.time() # 记录开始时间 # 创建静态文件存放文件夹 static_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "evaluate_api")) # 设置模型文件和配置文件的存放目录,和本py同级 os.makedirs(static_dir, exist_ok=True) # 训练前设置 now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") data_path = os.path.abspath(os.path.join(static_dir, f"evaluate_feature_label_weighted_{now}.xlsx")) config['data_path'] = data_path feature_label_weighted.to_excel(data_path, index=False) # 配置验证结果图片路径 evaluate_result_path = os.path.abspath(os.path.join(static_dir, f"evaluate_result_img_{now}.png")) config['evaluate_result_path'] = evaluate_result_path # 配置日志 log_path = os.path.abspath(os.path.join(static_dir, f"evaluate_log_{now}.log")) logging.basicConfig(filename=log_path, level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S') # 特征和标签 X = feature_label_weighted[config['feature_names']].values y = feature_label_weighted[config['label_name']].values # 初始化 MLModel 实例 ml_model = MLModel(config) # 加载模型 ml_model.load_model() avg_f1, wrong_percentage, precision, recall, f1 = ml_model.evaluate_model(X, y) end_time = time.time() # 记录结束时间 log_print("预测耗时: " + str(end_time - start_time) + " 秒") # 打印执行时间 # 保证日志写到文件 atexit.register(flush_log) # 返回分类结果和模型文件下载 URL,static不是程序执行路径,而是app.mount的静态文件夹 log_file_url = f"{request.base_url}evaluate_api/evaluate_log_{now}.log" data_file_url = f"{request.base_url}evaluate_api/evaluate_feature_label_weighted_{now}.xlsx" # 返回分类结果和模型文件 return { "classification_result": ClassificationResult( precision=precision, recall=recall, f1=f1, wrong_percentage=wrong_percentage ), "data_file": { "log_file_url": log_file_url, "data_file_url": data_file_url } } # 定义推理接口 @app.post("/inference/") async def inference_model(request: Request, features_list: List[Features]): # 遍历每个特征对象,并将其添加到 all_features 中 all_features = create_feature_df(features_list) # 读取 YAML 配置文件 with open(config_path, 'r') as f: config = yaml.load(f, Loader=yaml.FullLoader) feature_names = config['feature_names'] feature_weights = config['feature_weights'] # 应用特征权重 feature_label_weighted = apply_feature_weights(all_features, feature_names, feature_weights) start_time = time.time() # 记录开始时间 # 创建静态文件存放文件夹 static_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "inference_api")) # 设置模型文件和配置文件的存放目录,和本py同级 os.makedirs(static_dir, exist_ok=True) # 训练前设置 now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") data_path = os.path.abspath(os.path.join(static_dir, f"inference_feature_label_weighted_{now}.xlsx")) config['data_path'] = data_path feature_label_weighted.to_excel(data_path, index=False) # 配置日志 log_path = os.path.abspath(os.path.join(static_dir, f"inference_log_{now}.log")) logging.basicConfig(filename=log_path, level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S') # 特征和标签 X = feature_label_weighted[config['feature_names']].values # 初始化 MLModel 实例 ml_model = MLModel(config) # 加载模型 ml_model.load_model() predictions = ml_model.inference_model(X) end_time = time.time() # 记录结束时间 log_print("预测耗时: " + str(end_time - start_time) + " 秒") # 打印执行时间 log_print("预测结果: " + str(predictions)) # 保证日志写到文件 atexit.register(flush_log) # 返回预测结果 return PredictionResult(predictions=predictions) # 以下是fastapi启动配置 if __name__ == "__main__": name_app = os.path.basename(__file__)[0:-3] # Get the name of the script log_config = { "version": 1, "disable_existing_loggers": True, "handlers": { "file_handler": { "class": "logging.FileHandler", "filename": "logfile.log", }, }, "root": { "handlers": ["file_handler"], "level": "INFO", }, } # 创建静态文件存放文件夹 static_dir_train = os.path.abspath(os.path.join(os.path.dirname(__file__), "train_api")) # 设置模型文件和配置文件的存放目录,和本py同级 static_dir_evaluate = os.path.abspath(os.path.join(os.path.dirname(__file__), "evaluate_api")) static_dir_inference = os.path.abspath(os.path.join(os.path.dirname(__file__), "inference_api")) os.makedirs(static_dir_train, exist_ok=True) os.makedirs(static_dir_evaluate, exist_ok=True) os.makedirs(static_dir_inference, exist_ok=True) # 同级目录下的static文件夹 app.mount("/train_api", StaticFiles(directory=static_dir_train), name="static_dir_train") app.mount("/evaluate_api", StaticFiles(directory=static_dir_evaluate), name="static_dir_evaluate") app.mount("/inference_api", StaticFiles(directory=static_dir_inference), name="static_dir_inference") uvicorn.run(app, host="0.0.0.0", port=3397, reload=False)