You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

423 lines
17 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import time
import datetime
import logging
import shutil
import uvicorn
import schedule
import threading
import yaml
import numpy as np
from fastapi import FastAPI, Request, File, UploadFile
from pydantic import BaseModel
from typing import List
import atexit
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from utils.feature_process import create_feature_df, apply_feature_weights, Features
from utils.common import MLModel
app = FastAPI()
# 控制是否打印的宏定义
PRINT_LOG = True
# 日志保留天数
DAYS = 1
# 清理文件夹
DIRS = ["train_api", "inference_api", "evaluate_api"]
# 初始化配置文件
config_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "config/config.yaml"))
# 初始化日志配置
log_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "logfile.log"))
logging.basicConfig(filename=log_path, level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
def log_print(message):
logging.info(message)
if PRINT_LOG:
print(message)
# 保证日志写到文件
def flush_log():
for handler in logging.getLogger().handlers:
handler.flush()
# 定义fastapi返回类 inference
class PredictionResult(BaseModel):
predictions: list
# 定义fastapi返回类
class ClassificationResult(BaseModel):
precision: list
recall: list
f1: list
wrong_percentage: float
# 允许所有域名的跨域请求
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["*"],
)
# 定义一个函数来动态更新日志处理器
def update_log_handler(log_path):
logger = logging.getLogger()
# 移除所有现有的处理器
for handler in logger.handlers[:]:
logger.removeHandler(handler)
# 添加新的文件处理器
file_handler = logging.FileHandler(log_path)
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
logger.addHandler(file_handler)
# 定义清理旧文件的函数
def clean_old_files(directory, days=DAYS):
now = time.time()
cutoff = now - (days * 86400) # 1天前的时间戳
for root, dirs, files in os.walk(directory):
for file in files:
file_path = os.path.join(root, file)
if os.path.getmtime(file_path) < cutoff:
os.remove(file_path)
print(f"Removed old file: {file_path}")
# 定期清理任务
def schedule_cleanup():
directories = DIRS
for directory in directories:
abs_directory = os.path.abspath(os.path.join(os.path.dirname(__file__), directory))
clean_old_files(abs_directory)
# 启动定时任务
def start_scheduler():
schedule.every(1).hours.do(schedule_cleanup)
while True:
schedule.run_pending()
time.sleep(1)
# 定义训练接口
@app.post("/train/")
async def train_model(request: Request, features_list: List[Features]):
global config_path
# 遍历每个特征对象,并将其添加到 all_features 中
all_features = create_feature_df(features_list)
# 读取 YAML 配置文件
with open(config_path, 'r') as f:
config = yaml.load(f, Loader=yaml.FullLoader)
feature_names = config['feature_names']
feature_weights = config['feature_weights']
# 应用特征权重
feature_label_weighted = apply_feature_weights(all_features, feature_names, feature_weights)
start_time = time.time() # 记录开始时间
# 创建静态文件存放文件夹
static_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "train_api")) # 设置模型文件和配置文件的存放目录和本py同级
os.makedirs(static_dir, exist_ok=True)
# 训练前设置
now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
data_path = os.path.abspath(os.path.join(static_dir, f"train_feature_label_weighted_{now}.xlsx"))
config['data_path'] = data_path
feature_label_weighted.to_excel(data_path, index=False)
# 添加模型保存路径
train_model_path = os.path.abspath(os.path.join(static_dir, f"train_model_{now}.pth"))
config['train_model_path'] = train_model_path
# 配置日志
log_path = os.path.abspath(os.path.join(static_dir, f"train_log_{now}.log"))
update_log_handler(log_path)
# 配置训练和验证结果图片路径
train_process_path = os.path.abspath(os.path.join(static_dir, f"train_progress_img_{now}.png"))
config['train_process_path'] = train_process_path
evaluate_result_path = os.path.abspath(os.path.join(static_dir, f"evaluate_result_img_{now}.png"))
config['evaluate_result_path'] = evaluate_result_path
log_print("config: " + str(config))
# 开始训练
# 初始化 MLModel 实例
ml_model = MLModel(config)
list_avg_f1 = []
list_wrong_percentage = []
list_precision = []
list_recall = []
list_f1 = []
train_times = 1 if config['experimental_mode'] == False else config["experiments_count"]
for _ in range(train_times):
avg_f1, wrong_percentage, precision, recall, f1 = ml_model.train_detect()
list_avg_f1.append(avg_f1)
list_wrong_percentage.append(wrong_percentage)
list_precision.append(precision)
list_recall.append(recall)
list_f1.append(f1)
log_print(f"Result: Avg F1: {sum(list_avg_f1) / len(list_avg_f1):.4f} Avg Wrong Percentage: {sum(list_wrong_percentage) / len(list_wrong_percentage):.2f}%")
log_print(f"Result: Avg Precision: {[sum(p[i] for p in list_precision) / len(list_precision) for i in range(len(list_precision[0]))]} | {np.mean(list_precision)}")
log_print(f"Result: Avg Recall: {[sum(r[i] for r in list_recall) / len(list_recall) for i in range(len(list_recall[0]))]} | {np.mean(list_recall)}")
log_print(f"Result: Avg F1: {[sum(f1[i] for f1 in list_f1) / len(list_f1) for i in range(len(list_f1[0]))]} | {np.mean(list_f1)}")
end_time = time.time() # 记录结束时间
log_print("预测耗时: " + str(end_time - start_time) + "") # 打印执行时间
# 替换现有检测模型
if(config["replace_model"] == True):
# 如果模型路径不是绝对路径,则转换为绝对路径
if not os.path.isabs(config["model_path"]):
abs_model_path = os.path.abspath(os.path.join(os.path.dirname(__file__), config["model_path"]))
config["model_path"] = abs_model_path
shutil.copyfile(config["train_model_path"], config["model_path"])
log_print(f"Model file has been copied from {config['train_model_path']} to {config['model_path']}")
# 保证日志写到文件
atexit.register(flush_log)
# 返回分类结果和模型文件下载 URLstatic不是程序执行路径而是app.mount的静态文件夹
model_file_url = f"{request.base_url}train_api/train_model_{now}.pth"
log_file_url = f"{request.base_url}train_api/train_log_{now}.log"
data_file_url = f"{request.base_url}train_api/train_feature_label_weighted_{now}.xlsx"
train_process_img_url = f"{request.base_url}train_api/train_progress_img_{now}.png"
evaluate_result_img_url = f"{request.base_url}train_api/evaluate_result_img_{now}.png"
# 返回分类结果和模型文件
return {
"classification_result": ClassificationResult(
precision=precision,
recall=recall,
f1=f1,
wrong_percentage=wrong_percentage
),
"data_file": {
"model_file_url": model_file_url,
"log_file_url": log_file_url,
"data_file_url": data_file_url,
"train_process_img_url": train_process_img_url,
"evaluate_result_img_url": evaluate_result_img_url
}
}
# 定义验证接口
@app.post("/evaluate/")
async def evaluate_model(request: Request, features_list: List[Features]):
global config_path
# 遍历每个特征对象,并将其添加到 all_features 中
all_features = create_feature_df(features_list)
# 读取 YAML 配置文件
with open(config_path, 'r') as f:
config = yaml.load(f, Loader=yaml.FullLoader)
feature_names = config['feature_names']
feature_weights = config['feature_weights']
# 应用特征权重
feature_label_weighted = apply_feature_weights(all_features, feature_names, feature_weights)
start_time = time.time() # 记录开始时间
# 创建静态文件存放文件夹
static_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "evaluate_api")) # 设置模型文件和配置文件的存放目录和本py同级
os.makedirs(static_dir, exist_ok=True)
# 训练前设置
now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
data_path = os.path.abspath(os.path.join(static_dir, f"evaluate_feature_label_weighted_{now}.xlsx"))
config['data_path'] = data_path
feature_label_weighted.to_excel(data_path, index=False)
# 配置验证结果图片路径
evaluate_result_path = os.path.abspath(os.path.join(static_dir, f"evaluate_result_img_{now}.png"))
config['evaluate_result_path'] = evaluate_result_path
# 如果模型路径不是绝对路径,则转换为绝对路径
if not os.path.isabs(config["model_path"]):
abs_model_path = os.path.abspath(os.path.join(os.path.dirname(__file__), config["model_path"]))
config["model_path"] = abs_model_path
# 检查模型文件是否存在,如果不存在则复制模型文件
if not os.path.exists(config["model_path"]):
if not os.path.isabs(config['default_model_path']):
config['default_model_path'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), config['default_model_path'])
os.makedirs(os.path.dirname(config["model_path"]), exist_ok=True)
shutil.copyfile(config['default_model_path'], config["model_path"])
log_print(f"Model file not found. Copied default model from {config['default_model_path']} to {config['model_path']}")
# 配置日志
log_path = os.path.abspath(os.path.join(static_dir, f"evaluate_log_{now}.log"))
update_log_handler(log_path)
# 特征和标签
X = feature_label_weighted[config['feature_names']].values
y = feature_label_weighted[config['label_name']].values
# 初始化 MLModel 实例
ml_model = MLModel(config)
# 加载模型
ml_model.load_model()
avg_f1, wrong_percentage, precision, recall, f1 = ml_model.evaluate_model(X, y)
end_time = time.time() # 记录结束时间
log_print("预测耗时: " + str(end_time - start_time) + "") # 打印执行时间
# 保证日志写到文件
atexit.register(flush_log)
# 返回分类结果和模型文件下载 URLstatic不是程序执行路径而是app.mount的静态文件夹
log_file_url = f"{request.base_url}evaluate_api/evaluate_log_{now}.log"
data_file_url = f"{request.base_url}evaluate_api/evaluate_feature_label_weighted_{now}.xlsx"
evaluate_result_img_url = f"{request.base_url}evaluate_api/evaluate_result_img_{now}.png"
# 返回分类结果和模型文件
return {
"classification_result": ClassificationResult(
precision=precision,
recall=recall,
f1=f1,
wrong_percentage=wrong_percentage
),
"data_file": {
"log_file_url": log_file_url,
"data_file_url": data_file_url,
"evaluate_result_img_url": evaluate_result_img_url
}
}
# 定义推理接口
@app.post("/inference/")
async def inference_model(request: Request, features_list: List[Features]):
global config_path
# 遍历每个特征对象,并将其添加到 all_features 中
all_features = create_feature_df(features_list)
# 读取 YAML 配置文件
with open(config_path, 'r') as f:
config = yaml.load(f, Loader=yaml.FullLoader)
feature_names = config['feature_names']
feature_weights = config['feature_weights']
# 应用特征权重
feature_label_weighted = apply_feature_weights(all_features, feature_names, feature_weights)
start_time = time.time() # 记录开始时间
# 创建静态文件存放文件夹
static_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "inference_api")) # 设置模型文件和配置文件的存放目录和本py同级
os.makedirs(static_dir, exist_ok=True)
# 训练前设置
now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
data_path = os.path.abspath(os.path.join(static_dir, f"inference_feature_label_weighted_{now}.xlsx"))
config['data_path'] = data_path
feature_label_weighted.to_excel(data_path, index=False)
# 配置日志
log_path = os.path.abspath(os.path.join(static_dir, f"inference_log_{now}.log"))
update_log_handler(log_path)
# 如果模型路径不是绝对路径,则转换为绝对路径
if not os.path.isabs(config["model_path"]):
abs_model_path = os.path.abspath(os.path.join(os.path.dirname(__file__), config["model_path"]))
config["model_path"] = abs_model_path
# 检查模型文件是否存在,如果不存在则复制模型文件
if not os.path.exists(config["model_path"]):
if not os.path.isabs(config['default_model_path']):
config['default_model_path'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), config['default_model_path'])
os.makedirs(os.path.dirname(config["model_path"]), exist_ok=True)
shutil.copyfile(config['default_model_path'], config["model_path"])
log_print(f"Model file not found. Copied default model from {config['default_model_path']} to {config['model_path']}")
# 特征和标签
X = feature_label_weighted[config['feature_names']].values
# 初始化 MLModel 实例
ml_model = MLModel(config)
# 加载模型
ml_model.load_model()
predictions = ml_model.inference_model(X)
end_time = time.time() # 记录结束时间
log_print("预测耗时: " + str(end_time - start_time) + "") # 打印执行时间
log_print("预测结果: " + str(predictions))
# 保证日志写到文件
atexit.register(flush_log)
# 返回预测结果
return PredictionResult(predictions=predictions)
# 定义模型上传接口
@app.post("/upload_model/")
async def upload_model(file: UploadFile = File(...)):
global config_path
# 创建模型存放文件夹
models_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "models"))
os.makedirs(models_dir, exist_ok=True)
# 保存模型文件
file_path = os.path.join(models_dir, "psy.pth")
with open(file_path, "wb") as buffer:
buffer.write(await file.read())
return {"message": "模型上传成功", "file_path": file_path}
# 以下是fastapi启动配置
if __name__ == "__main__":
# 获取当前时间并格式化为字符串
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 打印程序启动时间
log_print(f"Program started at {current_time}")
name_app = os.path.basename(__file__)[0:-3] # Get the name of the script
log_config = {
"version": 1,
"disable_existing_loggers": True,
"handlers": {
"file_handler": {
"class": "logging.FileHandler",
"filename": "logfile.log",
},
},
"root": {
"handlers": ["file_handler"],
"level": "INFO",
},
}
# 创建静态文件存放文件夹
static_dir_train = os.path.abspath(os.path.join(os.path.dirname(__file__), "train_api")) # 设置模型文件和配置文件的存放目录和本py同级
static_dir_evaluate = os.path.abspath(os.path.join(os.path.dirname(__file__), "evaluate_api"))
static_dir_inference = os.path.abspath(os.path.join(os.path.dirname(__file__), "inference_api"))
static_dir_models = os.path.abspath(os.path.join(os.path.dirname(__file__), "models"))
os.makedirs(static_dir_train, exist_ok=True)
os.makedirs(static_dir_evaluate, exist_ok=True)
os.makedirs(static_dir_inference, exist_ok=True)
os.makedirs(static_dir_models, exist_ok=True)
# 同级目录下的static文件夹
app.mount("/train_api", StaticFiles(directory=static_dir_train), name="static_dir_train")
app.mount("/evaluate_api", StaticFiles(directory=static_dir_evaluate), name="static_dir_evaluate")
app.mount("/inference_api", StaticFiles(directory=static_dir_inference), name="static_dir_inference")
app.mount("/models", StaticFiles(directory=static_dir_models), name="static_dir_models")
# 启动定时任务
scheduler_thread = threading.Thread(target=start_scheduler)
scheduler_thread.daemon = True
scheduler_thread.start()
uvicorn.run(app, host="0.0.0.0", port=8088, reload=False)