Skip to Content

错误处理

完善的错误处理策略,确保应用稳定可靠。

基础错误处理

import asyncio from streamind_sdk import SDK, Config, Signal async def main(): sdk = SDK() config = Config(...) sdk.register_terminal("terminal-1", config) # 连接错误处理 try: await sdk.connect("terminal-1") print("连接成功") except ConnectionError as e: print(f"连接失败: {e}") return except Exception as e: print(f"未知错误: {e}") return # 发送信号错误处理 signal = Signal("sensor.data") signal.get_payload().set_number("value", 25.5) try: await sdk.send_signal("terminal-1", signal) print("信号发送成功") except Exception as e: print(f"发送失败: {e}") # 断开连接 try: await sdk.disconnect("terminal-1") except Exception as e: print(f"断开连接时出错: {e}") asyncio.run(main())

重试机制

连接重试

import asyncio from streamind_sdk import SDK, Config async def connect_with_retry(sdk, terminal_id, max_retries=3, retry_delay=5): """带重试的连接""" for attempt in range(max_retries): try: print(f"连接尝试 {attempt + 1}/{max_retries}...") await sdk.connect(terminal_id) print("连接成功") return True except Exception as e: print(f"连接失败: {e}") if attempt < max_retries - 1: print(f"等待 {retry_delay} 秒后重试...") await asyncio.sleep(retry_delay) else: print("达到最大重试次数,连接失败") return False async def main(): sdk = SDK() config = Config(...) sdk.register_terminal("terminal-1", config) # 带重试的连接 success = await connect_with_retry(sdk, "terminal-1", max_retries=5) if success: # 继续后续操作 await asyncio.sleep(60) await sdk.disconnect("terminal-1") else: print("无法建立连接") asyncio.run(main())

发送重试

import asyncio from streamind_sdk import SDK, Signal async def send_with_retry(sdk, terminal_id, signal, max_retries=3): """带重试的信号发送""" for attempt in range(max_retries): try: await sdk.send_signal(terminal_id, signal) print(f"信号发送成功(尝试 {attempt + 1})") return True except Exception as e: print(f"发送失败(尝试 {attempt + 1}): {e}") if attempt < max_retries - 1: await asyncio.sleep(1) else: print("达到最大重试次数,发送失败") return False # 使用 signal = Signal("sensor.data") signal.get_payload().set_number("value", 25.5) success = await send_with_retry(sdk, "terminal-1", signal, max_retries=5)

指数退避重试

import asyncio import random async def send_with_exponential_backoff(sdk, terminal_id, signal, max_retries=5): """指数退避重试""" base_delay = 1 # 基础延迟(秒) max_delay = 60 # 最大延迟(秒) for attempt in range(max_retries): try: await sdk.send_signal(terminal_id, signal) print(f"发送成功(尝试 {attempt + 1})") return True except Exception as e: print(f"发送失败(尝试 {attempt + 1}): {e}") if attempt < max_retries - 1: # 计算延迟时间:2^attempt * base_delay + 随机抖动 delay = min(base_delay * (2 ** attempt), max_delay) jitter = random.uniform(0, 0.1 * delay) # 10% 抖动 total_delay = delay + jitter print(f"等待 {total_delay:.2f} 秒后重试...") await asyncio.sleep(total_delay) else: print("达到最大重试次数") return False # 使用 signal = Signal("sensor.data") success = await send_with_exponential_backoff(sdk, "terminal-1", signal)

超时处理

import asyncio async def connect_with_timeout(sdk, terminal_id, timeout=10): """带超时的连接""" try: await asyncio.wait_for( sdk.connect(terminal_id), timeout=timeout ) print("连接成功") return True except asyncio.TimeoutError: print(f"连接超时({timeout}秒)") return False except Exception as e: print(f"连接失败: {e}") return False async def send_with_timeout(sdk, terminal_id, signal, timeout=5): """带超时的发送""" try: await asyncio.wait_for( sdk.send_signal(terminal_id, signal), timeout=timeout ) print("发送成功") return True except asyncio.TimeoutError: print(f"发送超时({timeout}秒)") return False except Exception as e: print(f"发送失败: {e}") return False # 使用 await connect_with_timeout(sdk, "terminal-1", timeout=15) signal = Signal("sensor.data") await send_with_timeout(sdk, "terminal-1", signal, timeout=3)

连接状态监控

import asyncio from streamind_sdk import SDK, Config class ConnectionMonitor: def __init__(self, sdk, terminal_id): self.sdk = sdk self.terminal_id = terminal_id self.is_connected = False self.reconnect_attempts = 0 self.max_reconnect_attempts = 10 async def setup(self): """设置连接监控""" def on_connection(status, message): if status == "connected": self.is_connected = True self.reconnect_attempts = 0 print("连接已建立") elif status == "disconnected": self.is_connected = False print(f"连接断开: {message}") # 触发重连 asyncio.create_task(self.auto_reconnect()) elif status == "error": print(f"连接错误: {message}") self.sdk.set_connection_callback(self.terminal_id, on_connection) async def auto_reconnect(self): """自动重连""" while not self.is_connected and self.reconnect_attempts < self.max_reconnect_attempts: self.reconnect_attempts += 1 print(f"重连尝试 {self.reconnect_attempts}/{self.max_reconnect_attempts}") try: await asyncio.sleep(5) # 等待5秒 await self.sdk.connect(self.terminal_id) except Exception as e: print(f"重连失败: {e}") if not self.is_connected: print("达到最大重连次数,放弃重连") # 使用 async def main(): sdk = SDK() config = Config(...) sdk.register_terminal("terminal-1", config) # 设置监控 monitor = ConnectionMonitor(sdk, "terminal-1") await monitor.setup() # 初始连接 await sdk.connect("terminal-1") # 保持运行 await asyncio.sleep(3600) asyncio.run(main())

批量操作错误处理

import asyncio from streamind_sdk import SDK, Signal async def send_to_multiple_terminals(sdk, terminal_ids, signal): """向多个终端发送,处理部分失败""" results = {} tasks = [ sdk.send_signal(terminal_id, signal) for terminal_id in terminal_ids ] # 使用 return_exceptions=True 继续执行即使有错误 outcomes = await asyncio.gather(*tasks, return_exceptions=True) for terminal_id, outcome in zip(terminal_ids, outcomes): if isinstance(outcome, Exception): results[terminal_id] = {"success": False, "error": str(outcome)} print(f"{terminal_id}: 发送失败 - {outcome}") else: results[terminal_id] = {"success": True} print(f"{terminal_id}: 发送成功") return results # 使用 signal = Signal("sensor.data") signal.get_payload().set_number("value", 25.5) terminal_ids = ["terminal-1", "terminal-2", "terminal-3"] results = await send_to_multiple_terminals(sdk, terminal_ids, signal) # 统计 success_count = sum(1 for r in results.values() if r["success"]) total_count = len(results) print(f"\n发送完成: {success_count}/{total_count} 成功")

配置验证

from streamind_sdk import Config def validate_config(config: Config) -> tuple[bool, str]: """验证配置""" # 检查必需字段 if not config.device_id: return False, "device_id 不能为空" if not config.device_type: return False, "device_type 不能为空" if not config.endpoint: return False, "endpoint 不能为空" if not config.endpoint.startswith("wss://"): return False, "endpoint 必须使用 wss:// 协议" if not config.tenant_id: return False, "tenant_id 不能为空" if not config.product_id: return False, "product_id 不能为空" if not config.product_key: return False, "product_key 不能为空" return True, "配置有效" # 使用 config = Config(...) is_valid, message = validate_config(config) if is_valid: sdk.register_terminal("terminal-1", config) else: print(f"配置无效: {message}")

数据验证

from streamind_sdk import Signal class SensorDataValidator: """传感器数据验证器""" @staticmethod def validate_temperature(value: float) -> tuple[bool, str]: """验证温度值""" if not isinstance(value, (int, float)): return False, "温度必须是数字" if not -50 <= value <= 150: return False, f"温度超出范围: {value}°C" return True, "有效" @staticmethod def create_validated_signal(temperature: float) -> Signal: """创建经过验证的温度信号""" is_valid, message = SensorDataValidator.validate_temperature(temperature) if not is_valid: raise ValueError(f"数据验证失败: {message}") signal = Signal("sensor.temperature") signal.get_payload().set_number("value", temperature) signal.get_payload().set_string("unit", "celsius") return signal # 使用 try: signal = SensorDataValidator.create_validated_signal(25.5) await sdk.send_signal("terminal-1", signal) except ValueError as e: print(f"验证错误: {e}")

日志记录

import asyncio import logging from streamind_sdk import SDK, Config, Signal # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[ logging.FileHandler('streamind_sdk.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) async def main(): try: sdk = SDK() config = Config(...) sdk.register_terminal("terminal-1", config) logger.info("开始连接到 StreamInd 平台") await sdk.connect("terminal-1") logger.info("连接成功") signal = Signal("sensor.data") signal.get_payload().set_number("value", 25.5) logger.info("发送传感器数据") await sdk.send_signal("terminal-1", signal) logger.info("数据发送成功") await asyncio.sleep(60) logger.info("断开连接") await sdk.disconnect("terminal-1") logger.info("已断开连接") except Exception as e: logger.error(f"应用程序错误: {e}", exc_info=True) raise asyncio.run(main())

优雅退出

import asyncio import signal from streamind_sdk import SDK, Config class Application: def __init__(self): self.sdk = SDK() self.running = True async def start(self): """启动应用""" config = Config(...) self.sdk.register_terminal("terminal-1", config) # 连接 await self.sdk.connect("terminal-1") print("应用已启动") # 主循环 while self.running: try: # 应用逻辑... await asyncio.sleep(1) except Exception as e: print(f"运行时错误: {e}") # 清理 await self.cleanup() async def cleanup(self): """清理资源""" print("正在清理资源...") try: await self.sdk.disconnect_all() print("资源清理完成") except Exception as e: print(f"清理时出错: {e}") def stop(self): """停止应用""" print("收到停止信号") self.running = False # 使用 async def main(): app = Application() # 设置信号处理 loop = asyncio.get_event_loop() def signal_handler(): app.stop() loop.add_signal_handler(signal.SIGINT, signal_handler) loop.add_signal_handler(signal.SIGTERM, signal_handler) try: await app.start() except Exception as e: print(f"应用错误: {e}") finally: print("应用已退出") asyncio.run(main())

最佳实践

  1. 异常捕获 - 捕获所有可能的异常,避免程序崩溃
  2. 重试机制 - 网络操作使用指数退避重试
  3. 超时设置 - 为所有异步操作设置合理超时
  4. 日志记录 - 记录所有关键操作和错误
  5. 优雅退出 - 确保资源正确释放
  6. 数据验证 - 发送前验证数据有效性
  7. 状态监控 - 实时监控连接状态并自动恢复

相关文档

Last updated on