import os import time import datetime import logging import shutil import uvicorn import schedule import threading import yaml import numpy as np from fastapi import FastAPI, Request, File, UploadFile from pydantic import BaseModel from typing import List import atexit from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from utils.feature_process import create_feature_df, apply_feature_weights, Features from utils.common import MLModel app = FastAPI() # 控制是否打印的宏定义 PRINT_LOG = True # 日志保留天数 DAYS = 1 # 清理文件夹 DIRS = ["train_api", "inference_api", "evaluate_api"] # 初始化配置文件 config_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "config/config.yaml")) # 初始化日志配置 log_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "logfile.log")) logging.basicConfig(filename=log_path, level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S') def log_print(message): logging.info(message) if PRINT_LOG: print(message) # 保证日志写到文件 def flush_log(): for handler in logging.getLogger().handlers: handler.flush() # 定义fastapi返回类 inference class PredictionResult(BaseModel): predictions: list # 定义fastapi返回类 class ClassificationResult(BaseModel): precision: list recall: list f1: list wrong_percentage: float # 允许所有域名的跨域请求 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], allow_headers=["*"], ) # 定义一个函数来动态更新日志处理器 def update_log_handler(log_path): logger = logging.getLogger() # 移除所有现有的处理器 for handler in logger.handlers[:]: logger.removeHandler(handler) # 添加新的文件处理器 file_handler = logging.FileHandler(log_path) file_handler.setLevel(logging.INFO) file_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')) logger.addHandler(file_handler) # 定义清理旧文件的函数 def clean_old_files(directory, days=DAYS): now = time.time() cutoff = now - (days * 86400) # 1天前的时间戳 for root, dirs, files in os.walk(directory): for file in files: file_path = os.path.join(root, file) if os.path.getmtime(file_path) < cutoff: os.remove(file_path) log_print(f"Removed old file: {file_path}") # 定期清理任务 def schedule_cleanup(): directories = DIRS for directory in directories: abs_directory = os.path.abspath(os.path.join(os.path.dirname(__file__), directory)) clean_old_files(abs_directory) # 启动定时任务 def start_scheduler(): schedule.every(1).hours.do(schedule_cleanup) while True: schedule.run_pending() time.sleep(1) # 定义训练接口 @app.post("/train/") async def train_model(request: Request, features_list: List[Features]): global config_path # 遍历每个特征对象,并将其添加到 all_features 中 all_features = create_feature_df(features_list) # 读取 YAML 配置文件 with open(config_path, 'r') as f: config = yaml.load(f, Loader=yaml.FullLoader) feature_names = config['feature_names'] feature_weights = config['feature_weights'] # 应用特征权重 feature_label_weighted = apply_feature_weights(all_features, feature_names, feature_weights) start_time = time.time() # 记录开始时间 # 创建静态文件存放文件夹 static_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "train_api")) # 设置模型文件和配置文件的存放目录,和本py同级 os.makedirs(static_dir, exist_ok=True) # 训练前设置 now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") data_path = os.path.abspath(os.path.join(static_dir, f"train_feature_label_weighted_{now}.xlsx")) config['data_path'] = data_path feature_label_weighted.to_excel(data_path, index=False) # 添加模型保存路径 train_model_path = os.path.abspath(os.path.join(static_dir, f"train_model_{now}.pth")) config['train_model_path'] = train_model_path # 配置日志 log_path = os.path.abspath(os.path.join(static_dir, f"train_log_{now}.log")) update_log_handler(log_path) # 配置训练和验证结果图片路径 train_process_path = os.path.abspath(os.path.join(static_dir, f"train_progress_img_{now}.png")) config['train_process_path'] = train_process_path evaluate_result_path = os.path.abspath(os.path.join(static_dir, f"evaluate_result_img_{now}.png")) config['evaluate_result_path'] = evaluate_result_path log_print("config: " + str(config)) # 开始训练 # 初始化 MLModel 实例 ml_model = MLModel(config) list_avg_f1 = [] list_wrong_percentage = [] list_precision = [] list_recall = [] list_f1 = [] train_times = 1 if config['experimental_mode'] == False else config["experiments_count"] for _ in range(train_times): avg_f1, wrong_percentage, precision, recall, f1 = ml_model.train_detect() list_avg_f1.append(avg_f1) list_wrong_percentage.append(wrong_percentage) list_precision.append(precision) list_recall.append(recall) list_f1.append(f1) log_print(f"Result: Avg F1: {sum(list_avg_f1) / len(list_avg_f1):.4f} Avg Wrong Percentage: {sum(list_wrong_percentage) / len(list_wrong_percentage):.2f}%") log_print(f"Result: Avg Precision: {[sum(p[i] for p in list_precision) / len(list_precision) for i in range(len(list_precision[0]))]} | {np.mean(list_precision)}") log_print(f"Result: Avg Recall: {[sum(r[i] for r in list_recall) / len(list_recall) for i in range(len(list_recall[0]))]} | {np.mean(list_recall)}") log_print(f"Result: Avg F1: {[sum(f1[i] for f1 in list_f1) / len(list_f1) for i in range(len(list_f1[0]))]} | {np.mean(list_f1)}") end_time = time.time() # 记录结束时间 log_print("预测耗时: " + str(end_time - start_time) + " 秒") # 打印执行时间 # 替换现有检测模型 if(config["replace_model"] == True): # 如果模型路径不是绝对路径,则转换为绝对路径 if not os.path.isabs(config["model_path"]): abs_model_path = os.path.abspath(os.path.join(os.path.dirname(__file__), config["model_path"])) config["model_path"] = abs_model_path shutil.copyfile(config["train_model_path"], config["model_path"]) log_print(f"Model file has been copied from {config['train_model_path']} to {config['model_path']}") # 保证日志写到文件 atexit.register(flush_log) # 返回分类结果和模型文件下载 URL,static不是程序执行路径,而是app.mount的静态文件夹 model_file_url = f"{request.base_url}train_api/train_model_{now}.pth" log_file_url = f"{request.base_url}train_api/train_log_{now}.log" data_file_url = f"{request.base_url}train_api/train_feature_label_weighted_{now}.xlsx" train_process_img_url = f"{request.base_url}train_api/train_progress_img_{now}.png" evaluate_result_img_url = f"{request.base_url}train_api/evaluate_result_img_{now}.png" # 返回分类结果和模型文件 return { "classification_result": ClassificationResult( precision=precision, recall=recall, f1=f1, wrong_percentage=wrong_percentage ), "data_file": { "model_file_url": model_file_url, "log_file_url": log_file_url, "data_file_url": data_file_url, "train_process_img_url": train_process_img_url, "evaluate_result_img_url": evaluate_result_img_url } } # 定义验证接口 @app.post("/evaluate/") async def evaluate_model(request: Request, features_list: List[Features]): global config_path # 遍历每个特征对象,并将其添加到 all_features 中 all_features = create_feature_df(features_list) # 读取 YAML 配置文件 with open(config_path, 'r') as f: config = yaml.load(f, Loader=yaml.FullLoader) feature_names = config['feature_names'] feature_weights = config['feature_weights'] # 应用特征权重 feature_label_weighted = apply_feature_weights(all_features, feature_names, feature_weights) start_time = time.time() # 记录开始时间 # 创建静态文件存放文件夹 static_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "evaluate_api")) # 设置模型文件和配置文件的存放目录,和本py同级 os.makedirs(static_dir, exist_ok=True) # 训练前设置 now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") data_path = os.path.abspath(os.path.join(static_dir, f"evaluate_feature_label_weighted_{now}.xlsx")) config['data_path'] = data_path feature_label_weighted.to_excel(data_path, index=False) # 配置验证结果图片路径 evaluate_result_path = os.path.abspath(os.path.join(static_dir, f"evaluate_result_img_{now}.png")) config['evaluate_result_path'] = evaluate_result_path # 如果模型路径不是绝对路径,则转换为绝对路径 if not os.path.isabs(config["model_path"]): abs_model_path = os.path.abspath(os.path.join(os.path.dirname(__file__), config["model_path"])) config["model_path"] = abs_model_path # 检查模型文件是否存在,如果不存在则复制模型文件 if not os.path.exists(config["model_path"]): if not os.path.isabs(config['default_model_path']): config['default_model_path'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), config['default_model_path']) os.makedirs(os.path.dirname(config["model_path"]), exist_ok=True) shutil.copyfile(config['default_model_path'], config["model_path"]) log_print(f"Model file not found. Copied default model from {config['default_model_path']} to {config['model_path']}") # 配置日志 log_path = os.path.abspath(os.path.join(static_dir, f"evaluate_log_{now}.log")) update_log_handler(log_path) # 特征和标签 X = feature_label_weighted[config['feature_names']].values y = feature_label_weighted[config['label_name']].values # 初始化 MLModel 实例 ml_model = MLModel(config) # 加载模型 ml_model.load_model() avg_f1, wrong_percentage, precision, recall, f1 = ml_model.evaluate_model(X, y) end_time = time.time() # 记录结束时间 log_print("预测耗时: " + str(end_time - start_time) + " 秒") # 打印执行时间 # 保证日志写到文件 atexit.register(flush_log) # 返回分类结果和模型文件下载 URL,static不是程序执行路径,而是app.mount的静态文件夹 log_file_url = f"{request.base_url}evaluate_api/evaluate_log_{now}.log" data_file_url = f"{request.base_url}evaluate_api/evaluate_feature_label_weighted_{now}.xlsx" evaluate_result_img_url = f"{request.base_url}evaluate_api/evaluate_result_img_{now}.png" # 返回分类结果和模型文件 return { "classification_result": ClassificationResult( precision=precision, recall=recall, f1=f1, wrong_percentage=wrong_percentage ), "data_file": { "log_file_url": log_file_url, "data_file_url": data_file_url, "evaluate_result_img_url": evaluate_result_img_url } } # 定义推理接口 @app.post("/inference/") async def inference_model(request: Request, features_list: List[Features]): global config_path # 遍历每个特征对象,并将其添加到 all_features 中 all_features = create_feature_df(features_list) # 读取 YAML 配置文件 with open(config_path, 'r') as f: config = yaml.load(f, Loader=yaml.FullLoader) feature_names = config['feature_names'] feature_weights = config['feature_weights'] # 应用特征权重 feature_label_weighted = apply_feature_weights(all_features, feature_names, feature_weights) start_time = time.time() # 记录开始时间 # 创建静态文件存放文件夹 static_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "inference_api")) # 设置模型文件和配置文件的存放目录,和本py同级 os.makedirs(static_dir, exist_ok=True) # 训练前设置 now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") data_path = os.path.abspath(os.path.join(static_dir, f"inference_feature_label_weighted_{now}.xlsx")) config['data_path'] = data_path feature_label_weighted.to_excel(data_path, index=False) # 配置日志 log_path = os.path.abspath(os.path.join(static_dir, f"inference_log_{now}.log")) update_log_handler(log_path) # 如果模型路径不是绝对路径,则转换为绝对路径 if not os.path.isabs(config["model_path"]): abs_model_path = os.path.abspath(os.path.join(os.path.dirname(__file__), config["model_path"])) config["model_path"] = abs_model_path # 检查模型文件是否存在,如果不存在则复制模型文件 if not os.path.exists(config["model_path"]): if not os.path.isabs(config['default_model_path']): config['default_model_path'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), config['default_model_path']) os.makedirs(os.path.dirname(config["model_path"]), exist_ok=True) shutil.copyfile(config['default_model_path'], config["model_path"]) log_print(f"Model file not found. Copied default model from {config['default_model_path']} to {config['model_path']}") # 特征和标签 X = feature_label_weighted[config['feature_names']].values # 初始化 MLModel 实例 ml_model = MLModel(config) # 加载模型 ml_model.load_model() predictions = ml_model.inference_model(X) end_time = time.time() # 记录结束时间 log_print("预测耗时: " + str(end_time - start_time) + " 秒") # 打印执行时间 log_print("预测结果: " + str(predictions)) # 保证日志写到文件 atexit.register(flush_log) # 返回预测结果 return PredictionResult(predictions=predictions) # 定义模型上传接口 @app.post("/upload_model/") async def upload_model(file: UploadFile = File(...)): global config_path # 创建模型存放文件夹 models_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "models")) os.makedirs(models_dir, exist_ok=True) # 保存模型文件 file_path = os.path.join(models_dir, "psy.pth") with open(file_path, "wb") as buffer: buffer.write(await file.read()) return {"message": "模型上传成功", "file_path": file_path} # 以下是fastapi启动配置 if __name__ == "__main__": # 获取当前时间并格式化为字符串 current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 打印程序启动时间 log_print(f"Program started at {current_time}") name_app = os.path.basename(__file__)[0:-3] # Get the name of the script log_config = { "version": 1, "disable_existing_loggers": True, "handlers": { "file_handler": { "class": "logging.FileHandler", "filename": "logfile.log", }, }, "root": { "handlers": ["file_handler"], "level": "INFO", }, } # 创建静态文件存放文件夹 static_dir_train = os.path.abspath(os.path.join(os.path.dirname(__file__), "train_api")) # 设置模型文件和配置文件的存放目录,和本py同级 static_dir_evaluate = os.path.abspath(os.path.join(os.path.dirname(__file__), "evaluate_api")) static_dir_inference = os.path.abspath(os.path.join(os.path.dirname(__file__), "inference_api")) static_dir_models = os.path.abspath(os.path.join(os.path.dirname(__file__), "models")) os.makedirs(static_dir_train, exist_ok=True) os.makedirs(static_dir_evaluate, exist_ok=True) os.makedirs(static_dir_inference, exist_ok=True) os.makedirs(static_dir_models, exist_ok=True) # 同级目录下的static文件夹 app.mount("/train_api", StaticFiles(directory=static_dir_train), name="static_dir_train") app.mount("/evaluate_api", StaticFiles(directory=static_dir_evaluate), name="static_dir_evaluate") app.mount("/inference_api", StaticFiles(directory=static_dir_inference), name="static_dir_inference") app.mount("/models", StaticFiles(directory=static_dir_models), name="static_dir_models") # 启动定时任务 scheduler_thread = threading.Thread(target=start_scheduler) scheduler_thread.daemon = True scheduler_thread.start() uvicorn.run(app, host="0.0.0.0", port=8088, reload=False)