You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
psy/utils/feature_processor.py

274 lines
12 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
特征处理模块
包含特征数据结构定义、特征转换、权重应用和归一化等功能
"""
from typing import List, Dict, Any, Union, Optional
import os
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
import pickle
from pydantic import BaseModel, Field
import logging
import shutil
class Features(BaseModel):
"""
单个样本的特征数据结构
Attributes:
somatization: 躯体化症状得分
obsessive_compulsive: 强迫症状得分
interpersonal_sensitivity: 人际关系敏感得分
depression: 抑郁症状得分
anxiety: 焦虑症状得分
hostility: 敌对症状得分
terror: 恐怖症状得分
paranoia: 偏执症状得分
psychoticism: 精神病性得分
other: 其他症状得分
father_parenting_style: 父亲教养方式 (1: 温暖与理解, 0: 其他)
mother_parenting_style: 母亲教养方式 (1: 温暖与理解, 0: 其他)
self_assessed_family_economic_condition: 家庭经济状况 (2: 贫困, 1: 较差, 0: 其他)
history_of_psychological_counseling: 心理咨询史 (True: 有, False: 无)
absenteeism_above_average: 出勤情况 (True: 高于平均, False: 低于平均)
academic_warning: 学业预警 (True: 有预警, False: 无预警)
label: 分类标签 (0-3)
"""
# SCL评测量范围0-4
somatization: float = Field(..., description="躯体化症状得分")
obsessive_compulsive: float = Field(..., description="强迫症状得分")
interpersonal_sensitivity: float = Field(..., description="人际关系敏感得分")
depression: float = Field(..., description="抑郁症状得分")
anxiety: float = Field(..., description="焦虑症状得分")
hostility: float = Field(..., description="敌对症状得分")
terror: float = Field(..., description="恐怖症状得分")
paranoia: float = Field(..., description="偏执症状得分")
psychoticism: float = Field(..., description="精神病性得分")
other: float = Field(..., description="其他症状得分")
# 基本信息特征
father_parenting_style: int = Field(..., description="父亲教养方式")
mother_parenting_style: int = Field(..., description="母亲教养方式")
self_assessed_family_economic_condition: int = Field(..., description="家庭经济状况")
history_of_psychological_counseling: bool = Field(..., description="心理咨询史")
# 日常行为特征
absenteeism_above_average: bool = Field(..., description="出勤情况")
academic_warning: bool = Field(..., description="学业预警")
# 标签
label: int = Field(..., description="分类标签")
class FeatureProcessor:
"""
特征处理类,负责特征的转换和处理
Attributes:
config: 配置信息字典
feature_mapping: 特征映射规则
feature_names: 特征名称列表
"""
def __init__(self, config: Dict[str, Any]) -> None:
self.config = config
self.feature_mapping = config['features']['feature_mapping']
self.feature_names = config['features']['feature_names']
def _process_scl_features(self, features: Features) -> Dict[str, float]:
"""
处理SCL相关特征
Args:
features: 特征对象
Returns:
处理后的SCL特征字典
"""
return {
"somatization": features.somatization,
"obsessive_compulsive": features.obsessive_compulsive,
"interpersonal_sensitivity": features.interpersonal_sensitivity,
"depression": features.depression,
"anxiety": features.anxiety,
"hostility": features.hostility,
"terror": features.terror,
"paranoia": features.paranoia,
"psychoticism": features.psychoticism,
"other": features.other
}
def _calculate_multi_factor(self, scl_features: Dict[str, float]) -> float:
"""
计算多因子症状得分
Args:
scl_features: SCL特征字典
Returns:
多因子症状得分
"""
severe_symptoms_count = sum(1 for value in scl_features.values() if value > 3.0)
return severe_symptoms_count / len(scl_features)
def create_feature_df(self, features_list: List[Features]) -> pd.DataFrame:
"""
创建特征DataFrame
Args:
features_list: 特征对象列表
Returns:
处理后的特征DataFrame
"""
processed_features = []
for features in features_list:
scl_features = self._process_scl_features(features)
try:
feature_dict = {
'父亲教养方式数字化': self.feature_mapping['父亲教养方式'].get(
features.father_parenting_style,
self.feature_mapping['父亲教养方式'][0] # 默认值
),
'母亲教养方式数字化': self.feature_mapping['母亲教养方式'].get(
features.mother_parenting_style,
self.feature_mapping['母亲教养方式'][0] # 默认值
),
'自评家庭经济条件数字化': self.feature_mapping['家庭经济条件'].get(
features.self_assessed_family_economic_condition,
self.feature_mapping['家庭经济条件'][0] # 默认值
),
'有无心理治疗(咨询)史数字化': self.feature_mapping['心理咨询史'][features.history_of_psychological_counseling],
'强迫症状数字化': features.obsessive_compulsive / 4,
'人际关系敏感数字化': features.interpersonal_sensitivity / 4,
'抑郁数字化': features.depression / 4,
'多因子症状': self._calculate_multi_factor(scl_features),
'出勤情况数字化': self.feature_mapping['出勤情况'][features.absenteeism_above_average],
'学业情况数字化': self.feature_mapping['学业情况'][features.academic_warning],
self.config['features']['label_name']: features.label
}
processed_features.append(feature_dict)
except KeyError as e:
logging.error(f"Invalid feature value: {e}")
# 使用默认值继续处理
feature_dict = {
'父亲教养方式数字化': self.feature_mapping['父亲教养方式'][0],
'母亲教养方式数字化': self.feature_mapping['母亲教养方式'][0],
'自评家庭经济条件数字化': self.feature_mapping['家庭经济条件'][0],
'有无心理治疗(咨询)史数字化': self.feature_mapping['心理咨询史'][False],
'强迫症状数字化': features.obsessive_compulsive / 4,
'人际关系敏感数字化': features.interpersonal_sensitivity / 4,
'抑郁数字化': features.depression / 4,
'多因子症状': self._calculate_multi_factor(scl_features),
'出勤情况数字化': self.feature_mapping['出勤情况'][False],
'学业情况数字化': self.feature_mapping['学业情况'][False],
self.config['features']['label_name']: features.label
}
processed_features.append(feature_dict)
return pd.DataFrame(processed_features)
class FeatureWeightApplier:
"""特征权重应用类"""
@staticmethod
def apply_weights(df: pd.DataFrame, feature_names: List[str],
feature_weights: List[float]) -> pd.DataFrame:
"""应用特征权重"""
max_weight = max(feature_weights)
weights_scaled = [w / max_weight for w in feature_weights]
features_data = df[feature_names].values
labels = df['类别'].values
features_weighted = features_data * np.array(weights_scaled)
result_df = pd.DataFrame(features_weighted, columns=feature_names)
result_df['类别'] = labels
return result_df
class FeatureNormalizer:
"""特征归一化处理类"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.feature_groups = {}
self.scalers = {}
for group_name, features in config['features']['groups'].items():
indices = [config['features']['feature_names'].index(feature)
for feature in features]
self.feature_groups[group_name] = indices
def fit_transform(self, features_data: np.ndarray) -> np.ndarray:
"""训练时的归一化转换"""
features_scaled = features_data.copy()
for group_name, indices in self.feature_groups.items():
self.scalers[group_name] = StandardScaler()
features_scaled[:, indices] = self.scalers[group_name].fit_transform(features_data[:, indices])
return features_scaled
def transform(self, features_data: np.ndarray) -> np.ndarray:
"""预测时的归一化转换"""
features_scaled = features_data.copy()
for group_name, indices in self.feature_groups.items():
features_scaled[:, indices] = self.scalers[group_name].transform(features_data[:, indices])
return features_scaled
def save(self, path: str) -> None:
"""
保存归一化参数
Args:
path: 保存路径
"""
# 确保目录存在
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'wb') as f:
pickle.dump(self.scalers, f)
def load(self, path: str) -> None:
"""
加载归一化参数
Args:
path: 加载路径
"""
# 如果主路径不存在,尝试从默认路径复制
if not os.path.exists(path):
default_path = os.path.join(
os.path.dirname(__file__),
'..',
self.config['paths']['normalizer']['default']
)
if os.path.exists(default_path):
# 确保目标目录存在
os.makedirs(os.path.dirname(path), exist_ok=True)
shutil.copyfile(default_path, path)
logging.info(f"Copied normalizer from default path: {default_path} to {path}")
else:
raise FileNotFoundError(f"No normalizer found in either path: {path} or {default_path}")
with open(path, 'rb') as f:
self.scalers = pickle.load(f)
def normalize_features(df: pd.DataFrame, feature_names: List[str],
is_train: bool, config: Dict[str, Any]) -> pd.DataFrame:
"""特征归一化处理入口函数"""
features_data = df[feature_names].values
labels = df[config['features']['label_name']].values
normalizer = FeatureNormalizer(config)
if is_train:
features_normalized = normalizer.fit_transform(features_data)
if config.get('paths', {}).get('normalizer', {}).get('train'):
normalizer.save(config['paths']['normalizer']['train'])
else:
normalizer.load(config['paths']['normalizer']['train'])
features_normalized = normalizer.transform(features_data)
result_df = pd.DataFrame(features_normalized, columns=feature_names)
result_df[config['features']['label_name']] = labels
return result_df