Obsidian
Mac
在Mac使用终端命令安装DMG
终端无法访问外挂硬盘
小诗词
SSH 无密码登录及 Rsync 同步配置笔记
Cron专题
at 命令笔记
Kostal 服务器密码
Docker 安装指南
MySQL Workbench 数据库迁移:哪些内容不需要迁移?
kostia
配置管理系统(ConfigManager)设计说明
本文档使用 MrDoc 发布
-
+
首页
配置管理系统(ConfigManager)设计说明
# 概述 ## 模块概述 配置管理系统是一个模块化、可扩展的 Python 库,用于加载、管理和验证应用程序配置。它支持从 YAML 文件加载多种配置类型(如数据库、RSA 密钥、API 密钥),提供动态注册、热加载和类型安全的配置访问接口。系统旨在简化配置管理,降低硬编码依赖,并支持新配置类型的快速扩展。 ### 核心功能 - **配置加载**:从 YAML 文件加载配置,支持多类型配置(如 `databases`、`rsa`、`api_keys`)。 - **动态注册**:通过 `ConfigRegistry` 动态注册配置类型,简化新类型添加。 - **类型安全**:使用 `ConfigModels` 定义强类型配置(如 `DatabaseConfig`、`RsaConfig`),确保数据一致性。 - **配置验证**:通过 `ConfigValidator` 验证配置有效性,检查字段完整性和跨配置一致性(如端口冲突)。 - **配置访问**:通过 `ConfigAccessor` 提供统一接口,按类型或路径访问配置。 - **热加载**:支持运行时重新加载配置文件,动态更新配置。 ### 应用场景 - **多环境配置**:管理开发、测试、生产环境的数据库、密钥等配置。 - **动态扩展**:快速添加新配置类型(如 OAuth 认证)。 - **运行时更新**:支持配置文件修改后实时生效,无需重启应用。 ### 调用方法 系统的核心入口是 `ConfigurationManager`,通过 `ConfigAccessor` 提供简洁的访问接口。以下是典型调用流程: 1. **初始化并加载配置**: ```python from configuration_manager import ConfigurationManager from config_accessor import ConfigAccessor config_manager = ConfigurationManager() config_manager.load_config("config.yaml") accessor = ConfigAccessor(config_manager) ``` 2. **访问配置**: ```python # 获取数据库配置 mysql_config = accessor.get_database("mysql") print(mysql_config.to_dict()) # 获取 RSA 配置 rsa_config = accessor.get_rsa() print(rsa_config.to_dict()) # 按路径访问 prod_key = accessor.get_by_path("api_keys.production.key") print(prod_key) ``` 3. **热加载配置**: ```python config_manager.reload_config("config.yaml") updated_key = accessor.get_by_path("api_keys.production.key") print(updated_key) ``` ### 配置文件格式 系统使用 YAML 文件(`config.yaml`)存储配置,示例: ```yaml databases: mysql: host: localhost port: 3306 username: user password: pass database: mydb mongodb: host: localhost port: 27017 uri: mongodb://user:pass@localhost:27017/mydb rsa: public_key: | -----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1234567890abcdef ghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ7890== -----END PUBLIC KEY----- private_key: | -----BEGIN PRIVATE KEY----- MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBA1234567890 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ -----END PRIVATE KEY----- api_keys: production: key: abc123 secret: xyz789 staging: key: def456 secret: uvw012 ``` # 子模块功能 ## 子模块功能 配置管理系统由以下六个子模块组成,每个模块职责明确,协同工作实现配置管理。 ### 1. ConfigLoader **功能**: - 从 YAML 文件加载配置,解析为 Python 字典。 - 处理文件读取错误(例如文件不存在、无效 YAML 格式)。 - 提供原始配置数据的访问接口。 **实现**: - 使用 `PyYAML` 解析 YAML 文件。 - 方法: - `load_from_file(file_path: str) -> Dict[str, Any]`:加载 YAML 文件。 - `get_config() -> Dict[str, Any]`:获取加载的配置数据。 - 异常处理:抛出 `FileNotFoundError`(文件不存在)、`yaml.YAMLError`(无效 YAML)。 **调用示例**: ```python loader = ConfigLoader() config_data = loader.load_from_file("config.yaml") print(config_data) ``` ### 2. ConfigModels **功能**: - 定义强类型的配置模型(`DatabaseConfig`、`RsaConfig`、`ApiKeyConfig`),继承自 `Config` 基类。 - 提供数据序列化(`to_dict`)和反序列化(`from_dict`)方法。 - 实现配置验证逻辑(`validate`),确保字段完整性和格式正确性。 **实现**: - **Config 基类**: - 定义抽象方法 `validate` 和 `to_dict`。 - 提供 `from_dict` 模板方法。 - **DatabaseConfig**: - 字段:`host`、`port`、`username`、`password`、`database`、`uri`(可选)、`options`(可选)。 - 验证:确保 `host` 和 `port` 非空,`uri` 格式正确。 - **RsaConfig**: - 字段:`public_key`、`private_key`。 - 验证:确保密钥为有效 PEM 格式。 - **ApiKeyConfig**: - 字段:`keys`(环境到密钥对的映射)。 - 验证:确保每个环境包含 `key` 和 `secret`。 **调用示例**: ```python data = {'host': 'localhost', 'port': 3306} db_config = DatabaseConfig.from_dict(data) db_config.validate() print(db_config.to_dict()) ``` ### 3. ConfigRegistry **功能**: - 动态注册和创建配置类型,支持扩展新类型。 - 维护配置类型(例如 `databases`、`rsa`)到配置类(`DatabaseConfig`、`RsaConfig`)的映射。 - 提供类型安全的配置对象创建接口。 **实现**: - 使用字典存储类型映射。 - 方法: - `register(config_type: str, config_class: Type[Config])`:注册配置类型。 - `create(config_type: str, data: Dict[str, Any]) -> Config`:创建配置对象。 - `get_registered_types() -> list[str]`:获取注册类型列表。 - 异常处理:抛出 `ValueError`(无效类型或未注册)、`ConfigValidationError`(配置数据无效)。 **调用示例**: ```python ConfigRegistry.register('databases', DatabaseConfig) db_config = ConfigRegistry.create('databases', {'host': 'localhost', 'port': 3306}) print(db_config.to_dict()) ``` ### 4. ConfigurationManager **功能**: - 核心管理类,整合 `ConfigLoader`、`ConfigRegistry` 和 `ConfigModels`。 - 加载配置,管理多数据库配置,切换数据库类型。 - 支持热加载,动态更新配置。 **实现**: - 存储配置:`database_configs`(多数据库)、`rsa_config`、`api_key_config`、`current_database`。 - 方法: - `load_config(file_path: str)`:加载 YAML 配置。 - `reload_config(file_path: str)`:清空并重新加载配置。 - `get_database_config(db_type: str = None) -> DatabaseConfig`:获取数据库配置。 - `get_rsa_config() -> Optional[RsaConfig]`:获取 RSA 配置。 - `get_api_key_config() -> Optional[ApiKeyConfig]`:获取 API 密钥配置。 - `switch_database(db_type: str)`:切换当前数据库。 - `get_available_databases() -> list[str]`:获取可用数据库类型。 - 异常处理:抛出 `FileNotFoundError`、`yaml.YAMLError`、`ConfigValidationError`、`ValueError`。 **调用示例**: ```python config_manager = ConfigurationManager() config_manager.load_config("config.yaml") mysql_config = config_manager.get_database_config("mysql") config_manager.reload_config("config.yaml") ``` ### 5. ConfigValidator **功能**: - 验证配置有效性,确保字段完整性和跨配置一致性。 - 检查特定配置类型的规则(如数据库端口冲突、RSA 密钥格式)。 **实现**: - 方法: - `validate(config: Any, config_type: str)`:验证配置对象。 - 支持类型:`DatabaseConfig`(检查 `host`、`port`)、`RsaConfig`(检查 PEM 格式)、`ApiKeyConfig`(检查 `key` 和 `secret`)。 - 跨配置验证:检查多数据库配置的端口冲突。 - 异常处理:抛出 `ConfigValidationError`(验证失败)。 **调用示例**: ```python validator = ConfigValidator() db_config = DatabaseConfig(host='localhost', port=3306) validator.validate(db_config, 'databases') ``` ### 6. ConfigAccessor **功能**: - 提供简洁的配置访问接口,封装 `ConfigurationManager` 的复杂性。 - 支持按类型或路径访问配置。 **实现**: - 方法: - `get_database(db_type: str = None) -> DatabaseConfig`:获取数据库配置。 - `get_rsa() -> RsaConfig`:获取 RSA 配置。 - `get_api_keys() -> ApiKeyConfig`:获取 API 密钥配置。 - `get_by_path(path: str) -> Any`:按路径访问配置(例如 `api_keys.production.key`)。 - `get_available_databases() -> list[str]`:获取可用数据库类型。 - 异常处理:抛出 `ValueError`(无效类型或路径)、`ConfigValidationError`(访问错误)。 **调用示例**: ```python accessor = ConfigAccessor(config_manager) mysql_config = accessor.get_database("mysql") prod_key = accessor.get_by_path("api_keys.production.key") ``` # 设计优点 ## 设计优点 配置管理系统的设计具有以下优势: 1. **模块化**: - 每个子模块(`ConfigLoader`、`ConfigModels` 等)职责单一,遵循单一职责原则。 - 模块间通过明确接口交互,降低耦合度,便于维护和测试。 2. **可扩展性**: - `ConfigRegistry` 支持动态注册新配置类型,添加新类型(如 `OAuthConfig`)无需修改核心逻辑。 - `ConfigModels` 的基类设计允许快速定义新配置模型。 3. **类型安全**: - `ConfigModels` 使用强类型配置,结合 `from_dict` 和 `validate`,确保数据一致性。 - `ConfigRegistry` 和 `ConfigAccessor` 提供类型检查,减少运行时错误。 4. **健壮的错误处理**: - 每个模块包含详细的异常处理(`FileNotFoundError`、`yaml.YAMLError`、`ConfigValidationError`、`ValueError`)。 - 日志记录(`logging`)提供清晰的错误信息,便于调试。 5. **热加载支持**: - `ConfigurationManager.reload_config` 允许运行时更新配置,适合动态环境(如生产环境密钥轮换)。 - 热加载过程清空旧配置,确保一致性。 6. **统一访问接口**: - `ConfigAccessor` 提供简洁的 API,屏蔽底层复杂性,支持按类型和路径访问。 - 路径访问(`get_by_path`)灵活,适用于动态配置查询。 7. **跨配置验证**: - `ConfigValidator` 检查跨配置一致性(如端口冲突),提高系统可靠性。 # 设计缺点与改进方向 ## 设计缺点 1. **单一文件加载**: - 当前 `ConfigLoader` 仅支持单一 YAML 文件,不支持多文件合并或优先级配置。 - **改进**:扩展 `ConfigLoader` 支持多文件加载,合并配置: ```python def load_from_files(self, file_paths: List[str]) -> Dict[str, Any]: config = {} for path in file_paths: config.update(self.load_from_file(path)) return config ``` 2. **线程安全性不足**: - `ConfigurationManager` 和 `ConfigRegistry` 未考虑多线程访问,可能导致热加载时的竞争条件。 - **改进**:添加线程锁: ```python from threading import Lock class ConfigurationManager: def __init__(self): self.lock = Lock() def reload_config(self, file_path: str): with self.lock: self.database_configs.clear() self.load_config(file_path) ``` 3. **性能瓶颈**: - 热加载每次重新解析和验证全部配置,对于大规模配置可能影响性能。 - **改进**:缓存文件内容,比较变更后仅更新修改部分。 4. **安全问题**: - 敏感数据(如 API 密钥、RSA 私钥)存储在明文 YAML 文件中,存在泄露风险。 - **改进**:集成密钥管理服务(如 AWS Secrets Manager): ```python def load_secrets(self): secrets = boto3.client('secretsmanager').get_secret_value(SecretId='my-secret') return json.loads(secrets['SecretString']) ``` 5. **有限的配置类型支持**: - 当前仅支持数据库、RSA、API 密钥配置,缺少对复杂嵌套配置的支持。 - **改进**:扩展 `ConfigModels` 支持递归配置结构。 6. **缺乏通知机制**: - 热加载后,业务代码无法自动感知配置变更。 - **改进**:添加回调机制: ```python def register_callback(self, callback: Callable[[], None]): self.callbacks.append(callback) def reload_config(self, file_path: str): self.load_config(file_path) for callback in self.callbacks: callback() ``` 7. **测试覆盖有限**: - 当前测试依赖手动修改文件,缺乏自动化测试框架。 - **改进**:使用 `pytest` 编写单元测试,覆盖边界和错误场景。 ## 改进优先级 - **高优先级**:线程安全、密钥管理、多文件加载。 - **中优先级**:性能优化、通知机制。 - **低优先级**:复杂配置支持、自动化测试。 # 使用示例 ## 使用示例 以下是配置管理系统的完整使用示例,展示核心功能和测试用例。 ### 示例代码 ```python from configuration_manager import ConfigurationManager from config_accessor import ConfigAccessor import time # 初始化配置管理器 config_manager = ConfigurationManager() config_manager.load_config("config.yaml") accessor = ConfigAccessor(config_manager) # 获取可用数据库 print("Available databases:", accessor.get_available_databases()) # 获取 MySQL 配置 mysql_config = accessor.get_database("mysql") print("MySQL Config:", mysql_config.to_dict()) # 获取 RSA 配置 rsa_config = accessor.get_rsa() print("RSA Config:", rsa_config.to_dict()) # 获取 API 密钥配置 api_key_config = accessor.get_api_keys() print("API Key Config:", api_key_config.to_dict()) # 按路径访问 prod_key = accessor.get_by_path("api_keys.production.key") print("Production API Key:", prod_key) # 测试数据库切换 config_manager.switch_database("mongodb") mongodb_config = accessor.get_database() print("MongoDB Config:", mongodb_config.to_dict()) # 测试热加载 print("\nPlease modify config.yaml (e.g., change api_keys.production.key to 'newkey123')") print("Waiting 5 seconds...") time.sleep(5) config_manager.reload_config("config.yaml") updated_key = accessor.get_by_path("api_keys.production.key") print("Updated Production API Key:", updated_key) # 测试错误场景 try: accessor.get_by_path("api_keys.invalid.key") except ValueError as e: print(f"Error: {e}") ``` ### 测试用例 以下是通过 `main.py` 执行的测试用例,覆盖核心功能和错误场景: 1. **ConfigLoader**: - 加载 `config.yaml`,验证包含 `databases`、`rsa`、`api_keys`。 - 加载 `invalid.yaml`,捕获 `FileNotFoundError`。 2. **ConfigModels**: - 创建 `DatabaseConfig`,验证字段;测试缺失 `host`。 - 创建 `RsaConfig`,验证 PEM 格式。 - 创建 `ApiKeyConfig`,验证密钥对;测试缺失 `secret`。 3. **ConfigRegistry**: - 注册 `DatabaseConfig`、`RsaConfig`、`ApiKeyConfig`,验证类型列表。 - 创建配置,验证实例;测试未注册类型。 4. **ConfigurationManager**: - 加载配置,验证数据库类型。 - 切换数据库,验证配置;测试无效数据库。 5. **ConfigValidator**: - 验证所有配置类型;测试端口冲突。 6. **ConfigAccessor**: - 获取配置,验证内容;测试无效路径。 7. **热加载**: - 修改 `api_keys.production.key`,验证更新;测试无效文件。 ### 运行测试 1. 确保安装 PyYAML:`pip install pyyaml`。 2. 保存所有文件:`config_loader.py`、`config_models.py`、`config_registry.py`、`configuration_manager.py`、`config_validator.py`、`config_accessor.py`、`main.py`、`config.yaml`。 3. 运行测试:`python main.py`。 4. 热加载测试:修改 `config.yaml`(例如将 `api_keys.production.key` 改为 `newkey123`)。 ### 预期输出 ``` Starting Configuration Management System Tests === Testing ConfigLoader === Config data loaded: dict_keys(['databases', 'rsa', 'api_keys']) Successfully caught FileNotFoundError for invalid.yaml === Testing ConfigModels === DatabaseConfig: {'host': 'localhost', 'port': 3306, ...} Successfully caught ConfigValidationError for invalid DatabaseConfig ... === Testing Hot Reloading === Initial production API key: abc123 Please modify config.yaml (e.g., change api_keys.production.key to 'newkey123') and save. Waiting 5 seconds for manual config change... Updated production API key: newkey123 Successfully caught FileNotFoundError for invalid.yaml All tests completed! ``` # 附件:完成代码 ### config.yaml ```yml databases: mysql: host: localhost port: 3306 username: user password: pass database: mydb mongodb: host: localhost port: 27017 uri: mongodb://user:pass@localhost:27017/mydb rsa: public_key: | -----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1234567890abcdef ghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ7890== -----END PUBLIC KEY----- private_key: | -----BEGIN PRIVATE KEY----- MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBA1234567890 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ -----END PRIVATE KEY----- api_keys: production: key: abc123 secret: xyz789 staging: key: def456 secret: uvw012 ``` ### main.py (用来测试各个模块) ```python import logging import time import os from typing import Dict from config_loader import ConfigLoader from config_models import DatabaseConfig, RsaConfig, ApiKeyConfig, ConfigValidationError from config_registry import ConfigRegistry from configuration_manager import ConfigurationManager from config_validator import ConfigValidator from config_accessor import ConfigAccessor # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def print_section(title: str): """打印测试section标题""" print(f"\n=== {title} ===") def test_config_loader(): """测试 ConfigLoader 功能""" print_section("Testing ConfigLoader") loader = ConfigLoader() # 测试加载有效 YAML 文件 try: config_data = loader.load_from_file("config.yaml") logger.info("Successfully loaded config.yaml") print("Config data loaded:", config_data.keys()) except Exception as e: logger.error(f"Failed to load config.yaml: {str(e)}") print(f"Error: {str(e)}") # 测试加载无效文件路径 try: loader.load_from_file("invalid.yaml") print("Error: Should have failed to load invalid.yaml") except FileNotFoundError as e: logger.info(f"Correctly caught FileNotFoundError: {str(e)}") print("Successfully caught FileNotFoundError for invalid.yaml") def test_config_models(): """测试 ConfigModels 功能""" print_section("Testing ConfigModels") # 测试 DatabaseConfig try: db_data = { 'host': 'localhost', 'port': 3306, 'username': 'user', 'password': 'pass', 'database': 'mydb' } db_config = DatabaseConfig.from_dict(db_data) db_config.validate() logger.info("Successfully created and validated DatabaseConfig") print("DatabaseConfig:", db_config.to_dict()) except ConfigValidationError as e: logger.error(f"Failed to validate DatabaseConfig: {str(e)}") print(f"Error: {str(e)}") # 测试无效 DatabaseConfig(缺失 host) try: invalid_db_data = {'port': 3306} db_config = DatabaseConfig.from_dict(invalid_db_data) print("Error: Should have failed to validate invalid DatabaseConfig") except ConfigValidationError as e: logger.info(f"Correctly caught ConfigValidationError: {str(e)}") print("Successfully caught ConfigValidationError for invalid DatabaseConfig") # 测试 RsaConfig try: rsa_data = { 'public_key': '-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1234567890abcdef\n-----END PUBLIC KEY-----', 'private_key': '-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBA1234567890\n-----END PRIVATE KEY-----' } rsa_config = RsaConfig.from_dict(rsa_data) rsa_config.validate() logger.info("Successfully created and validated RsaConfig") print("RsaConfig:", rsa_config.to_dict()) except ConfigValidationError as e: logger.error(f"Failed to validate RsaConfig: {str(e)}") print(f"Error: {str(e)}") # 测试 ApiKeyConfig try: api_key_data = { 'production': {'key': 'abc123', 'secret': 'xyz789'}, 'staging': {'key': 'def456', 'secret': 'uvw012'} } api_key_config = ApiKeyConfig.from_dict(api_key_data) api_key_config.validate() logger.info("Successfully created and validated ApiKeyConfig") print("ApiKeyConfig:", api_key_config.to_dict()) except ConfigValidationError as e: logger.error(f"Failed to validate ApiKeyConfig: {str(e)}") print(f"Error: {str(e)}") # 测试无效 ApiKeyConfig(缺失 secret) try: invalid_api_key_data = {'production': {'key': 'abc123'}} api_key_config = ApiKeyConfig.from_dict(invalid_api_key_data) print("Error: Should have failed to validate invalid ApiKeyConfig") except ConfigValidationError as e: logger.info(f"Correctly caught ConfigValidationError: {str(e)}") print("Successfully caught ConfigValidationError for invalid ApiKeyConfig") def test_config_registry(): """测试 ConfigRegistry 功能""" print_section("Testing ConfigRegistry") # 注册配置类型 try: ConfigRegistry.register('test_databases', DatabaseConfig) ConfigRegistry.register('test_rsa', RsaConfig) ConfigRegistry.register('test_api_keys', ApiKeyConfig) logger.info("Successfully registered config types") print("Registered types:", ConfigRegistry.get_registered_types()) except ValueError as e: logger.error(f"Failed to register config types: {str(e)}") print(f"Error: {str(e)}") # 测试创建配置 try: db_data = {'host': 'localhost', 'port': 3306} db_config = ConfigRegistry.create('test_databases', db_data) logger.info("Successfully created DatabaseConfig via ConfigRegistry") print("Created DatabaseConfig:", db_config.to_dict()) except ConfigValidationError as e: logger.error(f"Failed to create DatabaseConfig: {str(e)}") print(f"Error: {str(e)}") # 测试未注册类型 try: ConfigRegistry.create('invalid_type', {}) print("Error: Should have failed to create config for unregistered type") except ValueError as e: logger.info(f"Correctly caught ValueError: {str(e)}") print("Successfully caught ValueError for unregistered type") def test_configuration_manager(): """测试 ConfigurationManager 功能""" print_section("Testing ConfigurationManager") config_manager = ConfigurationManager() # 测试加载配置 try: config_manager.load_config("config.yaml") logger.info("Successfully loaded configuration") print("Available databases:", config_manager.get_available_databases()) except Exception as e: logger.error(f"Failed to load configuration: {str(e)}") print(f"Error: {str(e)}") # 测试获取数据库配置 try: mysql_config = config_manager.get_database_config("mysql") logger.info("Successfully retrieved MySQL config") print("MySQL Config:", mysql_config.to_dict()) except ValueError as e: logger.error(f"Failed to retrieve MySQL config: {str(e)}") print(f"Error: {str(e)}") # 测试切换数据库 try: config_manager.switch_database("mongodb") mongodb_config = config_manager.get_database_config() logger.info("Successfully switched to MongoDB") print("MongoDB Config:", mongodb_config.to_dict()) except ValueError as e: logger.error(f"Failed to switch to MongoDB: {str(e)}") print(f"Error: {str(e)}") # 测试无效数据库类型 try: config_manager.get_database_config("invalid") print("Error: Should have failed to retrieve invalid database config") except ValueError as e: logger.info(f"Correctly caught ValueError: {str(e)}") print("Successfully caught ValueError for invalid database type") def test_config_validator(): """测试 ConfigValidator 功能""" print_section("Testing ConfigValidator") validator = ConfigValidator() config_manager = ConfigurationManager() config_manager.load_config("config.yaml") # 测试验证数据库配置 try: validator.validate(config_manager.database_configs, 'databases') logger.info("Successfully validated database configurations") print("Database configurations validated") except ConfigValidationError as e: logger.error(f"Failed to validate database configurations: {str(e)}") print(f"Error: {str(e)}") # 测试验证 RSA 配置 try: rsa_config = config_manager.get_rsa_config() if rsa_config: validator.validate(rsa_config, 'rsa') logger.info("Successfully validated RSA configuration") print("RSA configuration validated") except ConfigValidationError as e: logger.error(f"Failed to validate RSA configuration: {str(e)}") print(f"Error: {str(e)}") # 测试验证 API 密钥配置 try: api_key_config = config_manager.get_api_key_config() if api_key_config: validator.validate(api_key_config, 'api_keys') logger.info("Successfully validated API key configuration") print("API key configuration validated") except ConfigValidationError as e: logger.error(f"Failed to validate API key configuration: {str(e)}") print(f"Error: {str(e)}") # 测试端口冲突 try: conflicting_configs = { 'db1': DatabaseConfig(host='localhost', port=3306), 'db2': DatabaseConfig(host='localhost', port=3306) } validator.validate(conflicting_configs, 'databases') print("Error: Should have failed to validate conflicting ports") except ConfigValidationError as e: logger.info(f"Correctly caught ConfigValidationError: {str(e)}") print("Successfully caught ConfigValidationError for conflicting ports") def test_config_accessor(): """测试 ConfigAccessor 功能""" print_section("Testing ConfigAccessor") config_manager = ConfigurationManager() config_manager.load_config("config.yaml") accessor = ConfigAccessor(config_manager) # 测试获取数据库配置 try: mysql_config = accessor.get_database("mysql") logger.info("Successfully retrieved MySQL config via ConfigAccessor") print("MySQL Config:", mysql_config.to_dict()) except ValueError as e: logger.error(f"Failed to retrieve MySQL config: {str(e)}") print(f"Error: {str(e)}") # 测试获取 RSA 配置 try: rsa_config = accessor.get_rsa() logger.info("Successfully retrieved RSA config via ConfigAccessor") print("RSA Config:", rsa_config.to_dict()) except ValueError as e: logger.error(f"Failed to retrieve RSA config: {str(e)}") print(f"Error: {str(e)}") # 测试获取 API 密钥配置 try: api_key_config = accessor.get_api_keys() logger.info("Successfully retrieved API key config via ConfigAccessor") print("API Key Config:", api_key_config.to_dict()) except ValueError as e: logger.error(f"Failed to retrieve API key config: {str(e)}") print(f"Error: {str(e)}") # 测试按路径访问 try: prod_key = accessor.get_by_path("api_keys.production.key") logger.info("Successfully retrieved production API key via path") print("Production API Key:", prod_key) except ValueError as e: logger.error(f"Failed to retrieve production API key: {str(e)}") print(f"Error: {str(e)}") # 测试无效路径 try: accessor.get_by_path("api_keys.invalid.key") print("Error: Should have failed to retrieve invalid path") except ValueError as e: logger.info(f"Correctly caught ValueError: {str(e)}") print("Successfully caught ValueError for invalid path") def test_hot_reloading(): """测试热加载功能""" print_section("Testing Hot Reloading") config_manager = ConfigurationManager() accessor = ConfigAccessor(config_manager) # 初始加载 try: config_manager.load_config("config.yaml") api_key_config = accessor.get_api_keys() initial_key = api_key_config.to_dict()['production']['key'] logger.info(f"Initial production API key: {initial_key}") print(f"Initial production API key: {initial_key}") except Exception as e: logger.error(f"Failed to load initial configuration: {str(e)}") print(f"Error: {str(e)}") # 提示修改配置文件 print("\nPlease modify config.yaml (e.g., change api_keys.production.key to 'newkey123') and save.") print("Waiting 5 seconds for manual config change...") time.sleep(5) # 测试热加载 try: config_manager.reload_config("config.yaml") api_key_config = accessor.get_api_keys() updated_key = api_key_config.to_dict()['production']['key'] logger.info(f"Updated production API key: {updated_key}") print(f"Updated production API key: {updated_key}") except Exception as e: logger.error(f"Failed to reload configuration: {str(e)}") print(f"Error: {str(e)}") # 测试无效文件热加载 try: config_manager.reload_config("invalid.yaml") print("Error: Should have failed to reload invalid.yaml") except FileNotFoundError as e: logger.info(f"Correctly caught FileNotFoundError: {str(e)}") print("Successfully caught FileNotFoundError for invalid.yaml") def main(): """主测试函数""" print("Starting Configuration Management System Tests\n") # 运行所有测试 test_config_loader() test_config_models() test_config_registry() test_configuration_manager() test_config_validator() test_config_accessor() test_hot_reloading() print("\nAll tests completed!") if __name__ == "__main__": main() ``` ### ConfigAccessor.py ```python import logging from typing import Any, Optional from config_models import DatabaseConfig, RsaConfig, ApiKeyConfig, ConfigValidationError from configuration_manager import ConfigurationManager # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class ConfigAccessor: """ ConfigAccessor 提供简洁的配置访问接口,封装 ConfigurationManager 的底层实现。 支持获取数据库配置、RSA 配置、API 密钥配置和按路径访问任意配置值。 """ def __init__(self, manager: ConfigurationManager): """ 初始化 ConfigAccessor。 Args: manager (ConfigurationManager): 配置管理器实例。 """ self.manager = manager def get_database(self, db_type: Optional[str] = None) -> DatabaseConfig: """ 获取指定数据库类型的配置,或当前数据库配置。 Args: db_type (Optional[str]): 数据库类型(例如 'mysql')。若为 None,使用当前数据库。 Returns: DatabaseConfig: 指定数据库的配置对象。 Raises: ValueError: 如果数据库类型无效或未配置。 ConfigValidationError: 如果配置无效。 """ logger.info(f"Accessing database configuration for {db_type or 'current database'}") try: config = self.manager.get_database_config(db_type) logger.info(f"Successfully retrieved database configuration for {db_type or self.manager.current_database}") return config except ValueError as e: logger.error(f"Failed to access database configuration: {str(e)}") raise except Exception as e: logger.error(f"Unexpected error accessing database configuration: {str(e)}") raise ConfigValidationError(f"Unexpected error accessing database configuration: {str(e)}") def get_rsa(self) -> RsaConfig: """ 获取 RSA 配置。 Returns: RsaConfig: RSA 配置对象。 Raises: ValueError: 如果 RSA 配置未加载。 ConfigValidationError: 如果访问过程中发生意外错误。 """ logger.info("Accessing RSA configuration") try: config = self.manager.get_rsa_config() if config is None: raise ValueError("RSA configuration not loaded") logger.info("Successfully retrieved RSA configuration") return config except ValueError as e: logger.error(f"Failed to access RSA configuration: {str(e)}") raise except Exception as e: logger.error(f"Unexpected error accessing RSA configuration: {str(e)}") raise ConfigValidationError(f"Unexpected error accessing RSA configuration: {str(e)}") def get_api_keys(self) -> ApiKeyConfig: """ 获取 API 密钥配置。 Returns: ApiKeyConfig: API 密钥配置对象。 Raises: ValueError: 如果 API 密钥配置未加载。 ConfigValidationError: 如果访问过程中发生意外错误。 """ logger.info("Accessing API key configuration") try: config = self.manager.get_api_key_config() if config is None: raise ValueError("API key configuration not loaded") logger.info("Successfully retrieved API key configuration") return config except ValueError as e: logger.error(f"Failed to access API key configuration: {str(e)}") raise except Exception as e: logger.error(f"Unexpected error accessing API key configuration: {str(e)}") raise ConfigValidationError(f"Unexpected error accessing API key configuration: {str(e)}") def get_by_path(self, path: str) -> Any: """ 按路径访问配置值(例如 'databases.mysql.host')。 Args: path (str): 配置路径,使用点号分隔(例如 'databases.mysql.host')。 Returns: Any: 路径对应的配置值。 Raises: ValueError: 如果路径无效或值不存在。 ConfigValidationError: 如果访问过程中发生意外错误。 """ logger.info(f"Accessing configuration by path: {path}") try: keys = path.split('.') if not keys: raise ValueError("Configuration path cannot be empty") config_data = self.manager.loader.get_config() current = config_data for key in keys: if isinstance(current, dict): if key not in current: raise ValueError(f"Configuration path '{path}' is invalid: key '{key}' not found") current = current[key] else: raise ValueError(f"Configuration path '{path}' is invalid: '{key}' is not a dictionary") logger.info(f"Successfully retrieved configuration value for path: {path}") return current except ValueError as e: logger.error(f"Failed to access configuration by path: {str(e)}") raise except Exception as e: logger.error(f"Unexpected error accessing configuration by path: {str(e)}") raise ConfigValidationError(f"Unexpected error accessing configuration by path: {str(e)}") def get_available_databases(self) -> list[str]: """ 获取所有可用数据库类型。 Returns: list[str]: 配置的数据库类型列表。 """ logger.info("Retrieving available database types") databases = self.manager.get_available_databases() logger.info(f"Available databases: {databases}") return databases # 示例使用(用于测试) if __name__ == "__main__": import time from config_loader import ConfigLoader from configuration_manager import ConfigurationManager try: # 初始化 ConfigurationManager 并加载配置 config_manager = ConfigurationManager() config_manager.load_config("config.yaml") # 初始化 ConfigAccessor accessor = ConfigAccessor(config_manager) # 获取初始配置 print("Initial configurations:") print("Available databases:", accessor.get_available_databases()) mysql_config = accessor.get_database("mysql") print("MySQL Config:", mysql_config.to_dict()) rsa_config = accessor.get_rsa() print("RSA Config:", rsa_config.to_dict()) api_key_config = accessor.get_api_keys() print("API Key Config:", api_key_config.to_dict()) prod_key = accessor.get_by_path("api_keys.production.key") print("Production API Key:", prod_key) # 测试热加载 print("\nPlease modify config.yaml (e.g., change api_keys.production.key to 'newkey123') and save.") print("Waiting 5 seconds for manual config change...") time.sleep(5) # 模拟等待用户修改 config.yaml # 重新加载配置 config_manager.reload_config("config.yaml") # 获取更新后的配置 print("\nAfter reload:") print("Available databases:", accessor.get_available_databases()) mysql_config = accessor.get_database("mysql") print("MySQL Config:", mysql_config.to_dict()) rsa_config = accessor.get_rsa() print("RSA Config:", rsa_config.to_dict()) api_key_config = accessor.get_api_keys() print("API Key Config:", api_key_config.to_dict()) prod_key = accessor.get_by_path("api_keys.production.key") print("Production API Key:", prod_key) # 测试无效路径 accessor.get_by_path("api_keys.invalid.key") except (ValueError, ConfigValidationError) as e: print(f"Error: {str(e)}") except Exception as e: print(f"Unexpected Error: {str(e)}") ``` ### ConfigurationManager.py ```python import logging from typing import Dict, Optional from config_loader import ConfigLoader from config_models import DatabaseConfig, RsaConfig, ApiKeyConfig, ConfigValidationError from config_registry import ConfigRegistry # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class ConfigurationManager: """ ConfigurationManager 是配置管理的核心类。 整合 ConfigLoader、ConfigRegistry 和 Config Models,提供配置访问和数据库类型切换。 支持热加载配置。 """ def __init__(self): """初始化 ConfigurationManager,包含 ConfigLoader 和配置存储。""" self.loader = ConfigLoader() self.database_configs: Dict[str, DatabaseConfig] = {} # 数据库配置缓存 self.rsa_config: Optional[RsaConfig] = None # RSA 配置 self.api_key_config: Optional[ApiKeyConfig] = None # API 密钥配置 self.current_database: Optional[str] = None # 当前数据库类型 # 注册配置类型 ConfigRegistry.register('databases', DatabaseConfig) ConfigRegistry.register('rsa', RsaConfig) ConfigRegistry.register('api_keys', ApiKeyConfig) def load_config(self, file_path: str) -> None: """ 从 YAML 文件加载配置,使用 ConfigRegistry 映射为配置对象。 Args: file_path (str): YAML 配置文件路径。 Raises: FileNotFoundError: 如果文件不存在。 yaml.YAMLError: 如果 YAML 文件无效。 ConfigValidationError: 如果配置无效。 """ logger.info(f"Loading configuration into ConfigurationManager from {file_path}") try: # 使用 ConfigLoader 加载原始配置 config_data = self.loader.load_from_file(file_path) # 映射数据库配置 databases = config_data.get('databases', {}) if not databases: logger.warning("No database configurations found in the file") for db_type, db_data in databases.items(): try: db_config = ConfigRegistry.create('databases', db_data) self.database_configs[db_type] = db_config logger.info(f"Mapped configuration for database type: {db_type}") except ConfigValidationError as e: logger.error(f"Invalid configuration for {db_type}: {str(e)}") raise except Exception as e: logger.error(f"Unexpected error mapping {db_type}: {str(e)}") raise ConfigValidationError(f"Failed to map {db_type}: {str(e)}") # 映射 RSA 配置 rsa_data = config_data.get('rsa', {}) if rsa_data: try: self.rsa_config = ConfigRegistry.create('rsa', rsa_data) logger.info("Mapped RSA configuration") except ConfigValidationError as e: logger.error(f"Invalid RSA configuration: {str(e)}") raise except Exception as e: logger.error(f"Unexpected error mapping RSA configuration: {str(e)}") raise ConfigValidationError(f"Failed to map RSA configuration: {str(e)}") # 映射 API 密钥配置 api_key_data = config_data.get('api_keys', {}) if api_key_data: try: self.api_key_config = ConfigRegistry.create('api_keys', api_key_data) logger.info("Mapped API key configuration") except ConfigValidationError as e: logger.error(f"Invalid API key configuration: {str(e)}") raise except Exception as e: logger.error(f"Unexpected error mapping API key configuration: {str(e)}") raise ConfigValidationError(f"Failed to map API key configuration: {str(e)}") # 设置默认数据库(如果有) if databases and not self.current_database: self.current_database = next(iter(databases)) logger.info(f"Set default database to: {self.current_database}") except Exception as e: logger.error(f"Failed to load configuration: {str(e)}") raise def reload_config(self, file_path: str) -> None: """ 重新加载配置文件,更新所有配置。 Args: file_path (str): YAML 配置文件路径。 Raises: FileNotFoundError: 如果文件不存在。 yaml.YAMLError: 如果 YAML 文件无效。 ConfigValidationError: 如果配置无效。 """ logger.info(f"Reloading configuration from {file_path}") try: # 清空现有配置 self.database_configs.clear() self.rsa_config = None self.api_key_config = None self.current_database = None logger.info("Cleared existing configurations") # 重新加载配置 self.load_config(file_path) logger.info("Successfully reloaded configuration") except Exception as e: logger.error(f"Failed to reload configuration: {str(e)}") raise def get_database_config(self, db_type: Optional[str] = None) -> DatabaseConfig: """ 获取指定数据库类型或当前数据库的配置。 Args: db_type (Optional[str]): 数据库类型(例如 'mysql')。若为 None,使用当前数据库。 Returns: DatabaseConfig: 指定或当前数据库的配置。 Raises: ValueError: 如果数据库类型无效或未配置。 """ if db_type is None: db_type = self.current_database if not db_type: logger.error("No database type specified and no current database set") raise ValueError("No database type specified and no current database set") config = self.database_configs.get(db_type) if config is None: logger.error(f"No configuration found for database type: {db_type}") raise ValueError(f"No configuration found for database type: {db_type}") logger.info(f"Retrieved configuration for database type: {db_type}") return config def get_rsa_config(self) -> Optional[RsaConfig]: """ 获取 RSA 配置。 Returns: Optional[RsaConfig]: RSA 配置对象,若未配置则返回 None。 """ logger.info("Retrieving RSA configuration") if self.rsa_config is None: logger.warning("RSA configuration not loaded") return self.rsa_config def get_api_key_config(self) -> Optional[ApiKeyConfig]: """ 获取 API 密钥配置。 Returns: Optional[ApiKeyConfig]: API 密钥配置对象,若未配置则返回 None。 """ logger.info("Retrieving API key configuration") if self.api_key_config is None: logger.warning("API key configuration not loaded") return self.api_key_config def switch_database(self, db_type: str) -> None: """ 切换当前数据库类型。 Args: db_type (str): 要切换的数据库类型(例如 'mysql')。 Raises: ValueError: 如果数据库类型未配置。 """ if db_type not in self.database_configs: logger.error(f"Cannot switch to unconfigured database type: {db_type}") raise ValueError(f"Database type not configured: {db_type}") self.current_database = db_type logger.info(f"Switched current database to: {db_type}") def get_available_databases(self) -> list[str]: """ 获取所有可用数据库类型。 Returns: list[str]: 配置的数据库类型列表。 """ return list(self.database_configs.keys()) # 示例使用(用于测试) if __name__ == "__main__": try: # 初始化 ConfigurationManager config_manager = ConfigurationManager() # 加载配置 config_manager.load_config("config.yaml") # 获取可用数据库 print("Available databases:", config_manager.get_available_databases()) # 获取 MySQL 配置 mysql_config = config_manager.get_database_config("mysql") print("MySQL Config:", mysql_config.to_dict()) # 获取 RSA 配置 rsa_config = config_manager.get_rsa_config() if rsa_config: print("RSA Config:", rsa_config.to_dict()) # 获取 API 密钥配置 api_key_config = config_manager.get_api_key_config() if api_key_config: print("API Key Config:", api_key_config.to_dict()) # 测试热加载 print("\nModifying config.yaml and reloading...") # 模拟修改 config.yaml(手动修改文件或在测试中替换) config_manager.reload_config("config.yaml") # 重新获取配置 print("After reload - Available databases:", config_manager.get_available_databases()) mysql_config = config_manager.get_database_config("mysql") print("After reload - MySQL Config:", mysql_config.to_dict()) api_key_config = config_manager.get_api_key_config() if api_key_config: print("After reload - API Key Config:", api_key_config.to_dict()) # 切换到 MongoDB 并获取配置 config_manager.switch_database("mongodb") mongodb_config = config_manager.get_database_config() print("MongoDB Config:", mongodb_config.to_dict()) # 测试无效数据库类型 config_manager.get_database_config("invalid") except Exception as e: print(f"Error: {str(e)}") ``` ### ConfigRegistry.py ```python import logging from typing import Dict, Type, Any from config_models import Config, ConfigValidationError # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class ConfigRegistry: """ ConfigRegistry 管理配置类型的注册和创建。 提供动态注册和实例化配置类(Config 子类)的功能。 """ registry: Dict[str, Type[Config]] = {} @classmethod def register(cls, config_type: str, config_class: Type[Config]) -> None: """ 注册配置类型。 Args: config_type (str): 配置类型标识(例如 'databases', 'rsa', 'api_keys')。 config_class (Type[Config]): 配置类(Config 的子类)。 Raises: ValueError: 如果 config_type 已注册或 config_class 无效。 """ logger.info(f"Registering config type: {config_type} with class: {config_class.__name__}") if not issubclass(config_class, Config): logger.error(f"Config class {config_class.__name__} must be a subclass of Config") raise ValueError(f"Config class {config_class.__name__} must be a subclass of Config") if config_type in cls.registry: logger.warning(f"Config type {config_type} is already registered, overwriting") cls.registry[config_type] = config_class logger.info(f"Successfully registered config type: {config_type}") @classmethod def create(cls, config_type: str, data: Dict[str, Any]) -> Config: """ 根据配置类型和数据创建配置对象。 Args: config_type (str): 配置类型标识。 data (Dict[str, Any]): 配置数据。 Returns: Config: 配置类的实例。 Raises: ValueError: 如果 config_type 未注册。 ConfigValidationError: 如果配置数据无效。 """ logger.info(f"Creating config object for type: {config_type}") config_class = cls.registry.get(config_type) if config_class is None: logger.error(f"No config class registered for type: {config_type}") raise ValueError(f"No config class registered for type: {config_type}") try: config = config_class.from_dict(data) logger.info(f"Successfully created config object for type: {config_type}") return config except ConfigValidationError as e: logger.error(f"Failed to create config for {config_type}: {str(e)}") raise except Exception as e: logger.error(f"Unexpected error creating config for {config_type}: {str(e)}") raise ConfigValidationError(f"Unexpected error creating config for {config_type}: {str(e)}") @classmethod def get_registered_types(cls) -> list[str]: """ 获取所有已注册的配置类型。 Returns: list[str]: 注册的配置类型列表。 """ return list(cls.registry.keys()) # 示例使用(用于测试) if __name__ == "__main__": from config_models import DatabaseConfig, RsaConfig, ApiKeyConfig try: # 注册配置类型 ConfigRegistry.register('databases', DatabaseConfig) ConfigRegistry.register('rsa', RsaConfig) ConfigRegistry.register('api_keys', ApiKeyConfig) # 获取注册的类型 print("Registered config types:", ConfigRegistry.get_registered_types()) # 测试创建 DatabaseConfig db_data = { 'host': 'localhost', 'port': 3306, 'username': 'user', 'password': 'pass', 'database': 'mydb' } db_config = ConfigRegistry.create('databases', db_data) print("DatabaseConfig:", db_config.to_dict()) # 测试创建 RsaConfig rsa_data = { 'public_key': '-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1234567890abcdef\n-----END PUBLIC KEY-----', 'private_key': '-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBA1234567890\n-----END PRIVATE KEY-----' } rsa_config = ConfigRegistry.create('rsa', rsa_data) print("RsaConfig:", rsa_config.to_dict()) # 测试创建 ApiKeyConfig api_key_data = { 'production': {'key': 'abc123', 'secret': 'xyz789'}, 'staging': {'key': 'def456', 'secret': 'uvw012'} } api_key_config = ConfigRegistry.create('api_keys', api_key_data) print("ApiKeyConfig:", api_key_config.to_dict()) # 测试未注册类型 ConfigRegistry.create('invalid', {}) except (ValueError, ConfigValidationError) as e: print(f"Error: {str(e)}") except Exception as e: print(f"Unexpected Error: {str(e)}") ``` ### ConfigAccessor.py ```python import logging from typing import Any, Optional from config_models import DatabaseConfig, RsaConfig, ApiKeyConfig, ConfigValidationError from configuration_manager import ConfigurationManager # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class ConfigAccessor: """ ConfigAccessor 提供简洁的配置访问接口,封装 ConfigurationManager 的底层实现。 支持获取数据库配置、RSA 配置、API 密钥配置和按路径访问任意配置值。 """ def __init__(self, manager: ConfigurationManager): """ 初始化 ConfigAccessor。 Args: manager (ConfigurationManager): 配置管理器实例。 """ self.manager = manager def get_database(self, db_type: Optional[str] = None) -> DatabaseConfig: """ 获取指定数据库类型的配置,或当前数据库配置。 Args: db_type (Optional[str]): 数据库类型(例如 'mysql')。若为 None,使用当前数据库。 Returns: DatabaseConfig: 指定数据库的配置对象。 Raises: ValueError: 如果数据库类型无效或未配置。 ConfigValidationError: 如果配置无效。 """ logger.info(f"Accessing database configuration for {db_type or 'current database'}") try: config = self.manager.get_database_config(db_type) logger.info(f"Successfully retrieved database configuration for {db_type or self.manager.current_database}") return config except ValueError as e: logger.error(f"Failed to access database configuration: {str(e)}") raise except Exception as e: logger.error(f"Unexpected error accessing database configuration: {str(e)}") raise ConfigValidationError(f"Unexpected error accessing database configuration: {str(e)}") def get_rsa(self) -> RsaConfig: """ 获取 RSA 配置。 Returns: RsaConfig: RSA 配置对象。 Raises: ValueError: 如果 RSA 配置未加载。 ConfigValidationError: 如果访问过程中发生意外错误。 """ logger.info("Accessing RSA configuration") try: config = self.manager.get_rsa_config() if config is None: raise ValueError("RSA configuration not loaded") logger.info("Successfully retrieved RSA configuration") return config except ValueError as e: logger.error(f"Failed to access RSA configuration: {str(e)}") raise except Exception as e: logger.error(f"Unexpected error accessing RSA configuration: {str(e)}") raise ConfigValidationError(f"Unexpected error accessing RSA configuration: {str(e)}") def get_api_keys(self) -> ApiKeyConfig: """ 获取 API 密钥配置。 Returns: ApiKeyConfig: API 密钥配置对象。 Raises: ValueError: 如果 API 密钥配置未加载。 ConfigValidationError: 如果访问过程中发生意外错误。 """ logger.info("Accessing API key configuration") try: config = self.manager.get_api_key_config() if config is None: raise ValueError("API key configuration not loaded") logger.info("Successfully retrieved API key configuration") return config except ValueError as e: logger.error(f"Failed to access API key configuration: {str(e)}") raise except Exception as e: logger.error(f"Unexpected error accessing API key configuration: {str(e)}") raise ConfigValidationError(f"Unexpected error accessing API key configuration: {str(e)}") def get_by_path(self, path: str) -> Any: """ 按路径访问配置值(例如 'databases.mysql.host')。 Args: path (str): 配置路径,使用点号分隔(例如 'databases.mysql.host')。 Returns: Any: 路径对应的配置值。 Raises: ValueError: 如果路径无效或值不存在。 ConfigValidationError: 如果访问过程中发生意外错误。 """ logger.info(f"Accessing configuration by path: {path}") try: keys = path.split('.') if not keys: raise ValueError("Configuration path cannot be empty") config_data = self.manager.loader.get_config() current = config_data for key in keys: if isinstance(current, dict): if key not in current: raise ValueError(f"Configuration path '{path}' is invalid: key '{key}' not found") current = current[key] else: raise ValueError(f"Configuration path '{path}' is invalid: '{key}' is not a dictionary") logger.info(f"Successfully retrieved configuration value for path: {path}") return current except ValueError as e: logger.error(f"Failed to access configuration by path: {str(e)}") raise except Exception as e: logger.error(f"Unexpected error accessing configuration by path: {str(e)}") raise ConfigValidationError(f"Unexpected error accessing configuration by path: {str(e)}") def get_available_databases(self) -> list[str]: """ 获取所有可用数据库类型。 Returns: list[str]: 配置的数据库类型列表。 """ logger.info("Retrieving available database types") databases = self.manager.get_available_databases() logger.info(f"Available databases: {databases}") return databases # 示例使用(用于测试) if __name__ == "__main__": from config_loader import ConfigLoader from configuration_manager import ConfigurationManager try: # 初始化 ConfigurationManager 并加载配置 config_manager = ConfigurationManager() config_manager.load_config("config.yaml") # 初始化 ConfigAccessor accessor = ConfigAccessor(config_manager) # 获取可用数据库 print("Available databases:", accessor.get_available_databases()) # 获取 MySQL 配置 mysql_config = accessor.get_database("mysql") print("MySQL Config:", mysql_config.to_dict()) # 获取 RSA 配置 rsa_config = accessor.get_rsa() print("RSA Config:", rsa_config.to_dict()) # 获取 API 密钥配置 api_key_config = accessor.get_api_keys() print("API Key Config:", api_key_config.to_dict()) # 按路径访问配置 prod_key = accessor.get_by_path("api_keys.production.key") print("Production API Key:", prod_key) # 测试无效路径 accessor.get_by_path("api_keys.invalid.key") except (ValueError, ConfigValidationError) as e: print(f"Error: {str(e)}") except Exception as e: print(f"Unexpected Error: {str(e)}") ``` ### ConfigValidator.py ```python import logging from typing import Dict, List from config_models import DatabaseConfig, RsaConfig, ApiKeyConfig, ConfigValidationError # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class ConfigValidator: """ ConfigValidator 负责验证配置的正确性和一致性。 支持单个配置对象的校验和跨配置的校验。 """ def __init__(self): """初始化 ConfigValidator,无需初始状态。""" pass def validate_database(self, config: DatabaseConfig, db_type: str) -> None: """ 验证单个数据库配置。 Args: config (DatabaseConfig): 要验证的数据库配置对象。 db_type (str): 数据库类型(例如 'mysql')。 Raises: ConfigValidationError: 如果配置无效。 """ logger.info(f"Validating database configuration for {db_type}") try: config.validate() if db_type == 'mysql' and not config.database: raise ConfigValidationError(f"MySQL configuration requires a database name") if db_type == 'mongodb' and config.options.get('uri') and (config.username or config.password): raise ConfigValidationError(f"MongoDB configuration with URI should not specify username/password separately") logger.info(f"Database configuration for {db_type} is valid") except ConfigValidationError as e: logger.error(f"Validation failed for {db_type}: {str(e)}") raise except Exception as e: logger.error(f"Unexpected error validating {db_type}: {str(e)}") raise ConfigValidationError(f"Unexpected error validating {db_type}: {str(e)}") def validate_databases(self, configs: Dict[str, DatabaseConfig]) -> None: """ 验证所有数据库配置,包括跨配置一致性。 Args: configs (Dict[str, DatabaseConfig]): 数据库类型到配置的映射。 Raises: ConfigValidationError: 如果配置无效或不一致。 """ logger.info("Validating all database configurations") if not configs: logger.warning("No database configurations to validate") return for db_type, config in configs.items(): self.validate_database(config, db_type) ports = [(db_type, config.port) for db_type, config in configs.items()] unique_ports = {port for _, port in ports} if len(unique_ports) < len(ports): port_counts = {} for db_type, port in ports: port_counts[port] = port_counts.get(port, []) + [db_type] conflicts = {port: dbs for port, dbs in port_counts.items() if len(dbs) > 1} raise ConfigValidationError(f"Port conflicts detected: {conflicts}") logger.info("All database configurations are valid") def validate_rsa(self, config: RsaConfig) -> None: """ 验证 RSA 配置。 Args: config (RsaConfig): 要验证的 RSA 配置对象。 Raises: ConfigValidationError: 如果配置无效。 """ logger.info("Validating RSA configuration") try: config.validate() logger.info("RSA configuration is valid") except ConfigValidationError as e: logger.error(f"Validation failed for RSA configuration: {str(e)}") raise except Exception as e: logger.error(f"Unexpected error validating RSA configuration: {str(e)}") raise ConfigValidationError(f"Unexpected error validating RSA configuration: {str(e)}") def validate_api_keys(self, config: ApiKeyConfig) -> None: """ 验证 API 密钥配置。 Args: config (ApiKeyConfig): 要验证的 API 密钥配置对象。 Raises: ConfigValidationError: 如果配置无效。 """ logger.info("Validating API key configuration") try: config.validate() logger.info("API key configuration is valid") except ConfigValidationError as e: logger.error(f"Validation failed for API key configuration: {str(e)}") raise except Exception as e: logger.error(f"Unexpected error validating API key configuration: {str(e)}") raise ConfigValidationError(f"Unexpected error validating API key configuration: {str(e)}") def validate(self, configs: Dict[str, any], config_type: str) -> None: """ 通用校验方法,支持不同配置类型。 Args: configs (Dict[str, any]): 配置映射(例如数据库配置)。 config_type (str): 配置类型(例如 'databases', 'rsa', 'api_keys')。 Raises: ConfigValidationError: 如果配置无效。 """ logger.info(f"Validating configurations of type: {config_type}") if config_type == 'databases': if not all(isinstance(config, DatabaseConfig) for config in configs.values()): raise ConfigValidationError(f"All {config_type} configurations must be DatabaseConfig objects") self.validate_databases(configs) elif config_type == 'rsa': if not isinstance(configs, RsaConfig): raise ConfigValidationError("RSA configuration must be a RsaConfig object") self.validate_rsa(configs) elif config_type == 'api_keys': if not isinstance(configs, ApiKeyConfig): raise ConfigValidationError("API key configuration must be a ApiKeyConfig object") self.validate_api_keys(configs) else: logger.warning(f"No validator defined for config type: {config_type}") # 未来扩展:支持其他类型 pass # 示例使用(用于测试) if __name__ == "__main__": from config_loader import ConfigLoader from configuration_manager import ConfigurationManager try: # 初始化 ConfigurationManager 并加载配置 config_manager = ConfigurationManager() config_manager.load_config("config.yaml") # 初始化 ConfigValidator validator = ConfigValidator() # 验证数据库配置 validator.validate(config_manager.database_configs, 'databases') # 验证 RSA 配置 rsa_config = config_manager.get_rsa_config() if rsa_config: validator.validate(rsa_config, 'rsa') # 验证 API 密钥配置 api_key_config = config_manager.get_api_key_config() if api_key_config: validator.validate(api_key_config, 'api_keys') # 测试无效 API 密钥配置 invalid_api_key = ApiKeyConfig(environments={'production': {'key': 'abc123'}}) validator.validate(invalid_api_key, 'api_keys') except ConfigValidationError as e: print(f"Validation Error: {str(e)}") except Exception as e: print(f"Error: {str(e)}") ``` ### DatabaseConfig.py ```python import logging from typing import Dict, Any, Optional from abc import ABC, abstractmethod # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class ConfigValidationError(Exception): """自定义异常,用于配置验证错误。""" pass class Config(ABC): """所有配置模型的抽象基类。""" @abstractmethod def validate(self) -> None: """ 验证配置。子类必须实现具体的验证逻辑。 Raises: ConfigValidationError: 如果验证失败。 """ pass @classmethod @abstractmethod def from_dict(cls, data: Dict[str, Any]) -> 'Config': """ 从字典创建配置对象。 Args: data (Dict[str, Any]): 来自 YAML 的配置数据。 Returns: Config: 配置模型的实例。 """ pass class DatabaseConfig(Config): """数据库配置模型。""" def __init__( self, host: str, port: int, username: Optional[str] = None, password: Optional[str] = None, database: Optional[str] = None, options: Optional[Dict[str, Any]] = None ): """ 初始化 DatabaseConfig 对象。 Args: host (str): 数据库主机地址。 port (int): 数据库端口号。 username (Optional[str]): 数据库用户名。 password (Optional[str]): 数据库密码。 database (Optional[str]): 数据库名称。 options (Optional[Dict[str, Any]]): 数据库特定选项。 """ self.host = host self.port = port self.username = username self.password = password self.database = database self.options = options or {} def validate(self) -> None: """ 验证数据库配置。 Raises: ConfigValidationError: 如果所需字段缺失或无效。 """ if not self.host: raise ConfigValidationError("Database host is required") if not isinstance(self.port, int): raise ConfigValidationError("Database port must be an integer") if self.port < 1 or self.port > 65535: raise ConfigValidationError(f"Database port {self.port} is out of valid range (1-65535)") logger.info(f"Validated DatabaseConfig: host={self.host}, port={self.port}") @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'DatabaseConfig': """ 从字典创建 DatabaseConfig。 Args: data (Dict[str, Any]): 包含数据库配置的字典。 Returns: DatabaseConfig: 初始化后的 DatabaseConfig 对象。 Raises: ConfigValidationError: 如果所需字段缺失或无效。 """ try: config = cls( host=data.get('host'), port=data.get('port'), username=data.get('username'), password=data.get('password'), database=data.get('database'), options=data.get('options') ) config.validate() return config except KeyError as e: logger.error(f"Missing required field in database config: {str(e)}") raise ConfigValidationError(f"Missing required field: {str(e)}") except Exception as e: logger.error(f"Failed to create DatabaseConfig: {str(e)}") raise ConfigValidationError(f"Invalid database configuration: {str(e)}") def to_dict(self) -> Dict[str, Any]: """ 将 DatabaseConfig 转换为字典。 Returns: Dict[str, Any]: 配置的字典表示。 """ return { 'host': self.host, 'port': self.port, 'username': self.username, 'password': self.password, 'database': self.database, 'options': self.options } class RsaConfig(Config): """RSA 配置模型,用于存储公钥和私钥。""" def __init__(self, public_key: str, private_key: str): """ 初始化 RsaConfig 对象。 Args: public_key (str): RSA 公钥(PEM 格式)。 private_key (str): RSA 私钥(PEM 格式)。 """ self.public_key = public_key self.private_key = private_key def validate(self) -> None: """ 验证 RSA 配置。 Raises: ConfigValidationError: 如果密钥缺失或格式无效。 """ if not self.public_key: raise ConfigValidationError("RSA public key is required") if not self.private_key: raise ConfigValidationError("RSA private key is required") if not self.public_key.strip().startswith('-----BEGIN PUBLIC KEY-----'): raise ConfigValidationError("RSA public key must be in PEM format") if not self.private_key.strip().startswith('-----BEGIN PRIVATE KEY-----'): raise ConfigValidationError("RSA private key must be in PEM format") logger.info("Validated RsaConfig") @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'RsaConfig': """ 从字典创建 RsaConfig。 Args: data (Dict[str, Any]): 包含 RSA 配置的字典。 Returns: RsaConfig: 初始化后的 RsaConfig 对象。 Raises: ConfigValidationError: 如果所需字段缺失或无效。 """ try: config = cls( public_key=data.get('public_key', ''), private_key=data.get('private_key', '') ) config.validate() return config except KeyError as e: logger.error(f"Missing required field in RSA config: {str(e)}") raise ConfigValidationError(f"Missing required field: {str(e)}") except Exception as e: logger.error(f"Failed to create RsaConfig: {str(e)}") raise ConfigValidationError(f"Invalid RSA configuration: {str(e)}") def to_dict(self) -> Dict[str, Any]: """ 将 RsaConfig 转换为字典。 Returns: Dict[str, Any]: 配置的字典表示。 """ return { 'public_key': self.public_key, 'private_key': self.private_key } class ApiKeyConfig(Config): """API 密钥配置模型,支持多环境密钥配置。""" def __init__(self, environments: Dict[str, Dict[str, str]]): """ 初始化 ApiKeyConfig 对象。 Args: environments (Dict[str, Dict[str, str]]): 环境到密钥配置的映射。 """ self.environments = environments def validate(self) -> None: """ 验证 API 密钥配置。 Raises: ConfigValidationError: 如果环境缺失或密钥无效。 """ if not self.environments: raise ConfigValidationError("At least one environment must be configured for API keys") for env, creds in self.environments.items(): if not isinstance(creds, dict): raise ConfigValidationError(f"Invalid credentials format for environment '{env}'") if 'key' not in creds or not creds['key']: raise ConfigValidationError(f"API key is required for environment '{env}'") if 'secret' not in creds or not creds['secret']: raise ConfigValidationError(f"API secret is required for environment '{env}'") logger.info("Validated ApiKeyConfig") @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'ApiKeyConfig': """ 从字典创建 ApiKeyConfig。 Args: data (Dict[str, Any]): 包含 API 密钥配置的字典。 Returns: ApiKeyConfig: 初始化后的 ApiKeyConfig 对象。 Raises: ConfigValidationError: 如果所需字段缺失或无效。 """ try: environments = data or {} if not isinstance(environments, dict): raise ConfigValidationError("API key configuration must be a dictionary of environments") config = cls(environments=environments) config.validate() return config except KeyError as e: logger.error(f"Missing required field in API key config: {str(e)}") raise ConfigValidationError(f"Missing required field: {str(e)}") except Exception as e: logger.error(f"Failed to create ApiKeyConfig: {str(e)}") raise ConfigValidationError(f"Invalid API key configuration: {str(e)}") def to_dict(self) -> Dict[str, Any]: """ 将 ApiKeyConfig 转换为字典。 Returns: Dict[str, Any]: 配置的字典表示。 """ return self.environments # 示例使用(用于测试) if __name__ == "__main__": try: # 测试 DatabaseConfig db_data = { 'host': 'localhost', 'port': 3306, 'username': 'user', 'password': 'pass', 'database': 'mydb', 'options': {'charset': 'utf8mb4'} } db_config = DatabaseConfig.from_dict(db_data) print("DatabaseConfig:", db_config.to_dict()) # 测试 RsaConfig rsa_data = { 'public_key': '-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1234567890abcdef\n-----END PUBLIC KEY-----', 'private_key': '-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBA1234567890\n-----END PRIVATE KEY-----' } rsa_config = RsaConfig.from_dict(rsa_data) print("RsaConfig:", rsa_config.to_dict()) # 测试 ApiKeyConfig api_key_data = { 'production': {'key': 'abc123', 'secret': 'xyz789'}, 'staging': {'key': 'def456', 'secret': 'uvw012'} } api_key_config = ApiKeyConfig.from_dict(api_key_data) print("ApiKeyConfig:", api_key_config.to_dict()) # 测试无效 API 密钥配置 invalid_api_key_data = {'production': {'key': 'abc123'}} # 缺少 secret api_key_config_invalid = ApiKeyConfig.from_dict(invalid_api_key_data) except ConfigValidationError as e: print(f"Validation Error: {str(e)}") except Exception as e: print(f"Error: {str(e)}") ```
admin
2025年4月24日 16:48
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
分享
链接
类型
密码
更新密码