错误处理
完善的错误处理策略,确保应用稳定可靠。
基础错误处理
import asyncio
from streamind_sdk import SDK, Config, Signal
async def main():
sdk = SDK()
config = Config(...)
sdk.register_terminal("terminal-1", config)
# 连接错误处理
try:
await sdk.connect("terminal-1")
print("连接成功")
except ConnectionError as e:
print(f"连接失败: {e}")
return
except Exception as e:
print(f"未知错误: {e}")
return
# 发送信号错误处理
signal = Signal("sensor.data")
signal.get_payload().set_number("value", 25.5)
try:
await sdk.send_signal("terminal-1", signal)
print("信号发送成功")
except Exception as e:
print(f"发送失败: {e}")
# 断开连接
try:
await sdk.disconnect("terminal-1")
except Exception as e:
print(f"断开连接时出错: {e}")
asyncio.run(main())重试机制
连接重试
import asyncio
from streamind_sdk import SDK, Config
async def connect_with_retry(sdk, terminal_id, max_retries=3, retry_delay=5):
"""带重试的连接"""
for attempt in range(max_retries):
try:
print(f"连接尝试 {attempt + 1}/{max_retries}...")
await sdk.connect(terminal_id)
print("连接成功")
return True
except Exception as e:
print(f"连接失败: {e}")
if attempt < max_retries - 1:
print(f"等待 {retry_delay} 秒后重试...")
await asyncio.sleep(retry_delay)
else:
print("达到最大重试次数,连接失败")
return False
async def main():
sdk = SDK()
config = Config(...)
sdk.register_terminal("terminal-1", config)
# 带重试的连接
success = await connect_with_retry(sdk, "terminal-1", max_retries=5)
if success:
# 继续后续操作
await asyncio.sleep(60)
await sdk.disconnect("terminal-1")
else:
print("无法建立连接")
asyncio.run(main())发送重试
import asyncio
from streamind_sdk import SDK, Signal
async def send_with_retry(sdk, terminal_id, signal, max_retries=3):
"""带重试的信号发送"""
for attempt in range(max_retries):
try:
await sdk.send_signal(terminal_id, signal)
print(f"信号发送成功(尝试 {attempt + 1})")
return True
except Exception as e:
print(f"发送失败(尝试 {attempt + 1}): {e}")
if attempt < max_retries - 1:
await asyncio.sleep(1)
else:
print("达到最大重试次数,发送失败")
return False
# 使用
signal = Signal("sensor.data")
signal.get_payload().set_number("value", 25.5)
success = await send_with_retry(sdk, "terminal-1", signal, max_retries=5)指数退避重试
import asyncio
import random
async def send_with_exponential_backoff(sdk, terminal_id, signal, max_retries=5):
"""指数退避重试"""
base_delay = 1 # 基础延迟(秒)
max_delay = 60 # 最大延迟(秒)
for attempt in range(max_retries):
try:
await sdk.send_signal(terminal_id, signal)
print(f"发送成功(尝试 {attempt + 1})")
return True
except Exception as e:
print(f"发送失败(尝试 {attempt + 1}): {e}")
if attempt < max_retries - 1:
# 计算延迟时间:2^attempt * base_delay + 随机抖动
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, 0.1 * delay) # 10% 抖动
total_delay = delay + jitter
print(f"等待 {total_delay:.2f} 秒后重试...")
await asyncio.sleep(total_delay)
else:
print("达到最大重试次数")
return False
# 使用
signal = Signal("sensor.data")
success = await send_with_exponential_backoff(sdk, "terminal-1", signal)超时处理
import asyncio
async def connect_with_timeout(sdk, terminal_id, timeout=10):
"""带超时的连接"""
try:
await asyncio.wait_for(
sdk.connect(terminal_id),
timeout=timeout
)
print("连接成功")
return True
except asyncio.TimeoutError:
print(f"连接超时({timeout}秒)")
return False
except Exception as e:
print(f"连接失败: {e}")
return False
async def send_with_timeout(sdk, terminal_id, signal, timeout=5):
"""带超时的发送"""
try:
await asyncio.wait_for(
sdk.send_signal(terminal_id, signal),
timeout=timeout
)
print("发送成功")
return True
except asyncio.TimeoutError:
print(f"发送超时({timeout}秒)")
return False
except Exception as e:
print(f"发送失败: {e}")
return False
# 使用
await connect_with_timeout(sdk, "terminal-1", timeout=15)
signal = Signal("sensor.data")
await send_with_timeout(sdk, "terminal-1", signal, timeout=3)连接状态监控
import asyncio
from streamind_sdk import SDK, Config
class ConnectionMonitor:
def __init__(self, sdk, terminal_id):
self.sdk = sdk
self.terminal_id = terminal_id
self.is_connected = False
self.reconnect_attempts = 0
self.max_reconnect_attempts = 10
async def setup(self):
"""设置连接监控"""
def on_connection(status, message):
if status == "connected":
self.is_connected = True
self.reconnect_attempts = 0
print("连接已建立")
elif status == "disconnected":
self.is_connected = False
print(f"连接断开: {message}")
# 触发重连
asyncio.create_task(self.auto_reconnect())
elif status == "error":
print(f"连接错误: {message}")
self.sdk.set_connection_callback(self.terminal_id, on_connection)
async def auto_reconnect(self):
"""自动重连"""
while not self.is_connected and self.reconnect_attempts < self.max_reconnect_attempts:
self.reconnect_attempts += 1
print(f"重连尝试 {self.reconnect_attempts}/{self.max_reconnect_attempts}")
try:
await asyncio.sleep(5) # 等待5秒
await self.sdk.connect(self.terminal_id)
except Exception as e:
print(f"重连失败: {e}")
if not self.is_connected:
print("达到最大重连次数,放弃重连")
# 使用
async def main():
sdk = SDK()
config = Config(...)
sdk.register_terminal("terminal-1", config)
# 设置监控
monitor = ConnectionMonitor(sdk, "terminal-1")
await monitor.setup()
# 初始连接
await sdk.connect("terminal-1")
# 保持运行
await asyncio.sleep(3600)
asyncio.run(main())批量操作错误处理
import asyncio
from streamind_sdk import SDK, Signal
async def send_to_multiple_terminals(sdk, terminal_ids, signal):
"""向多个终端发送,处理部分失败"""
results = {}
tasks = [
sdk.send_signal(terminal_id, signal)
for terminal_id in terminal_ids
]
# 使用 return_exceptions=True 继续执行即使有错误
outcomes = await asyncio.gather(*tasks, return_exceptions=True)
for terminal_id, outcome in zip(terminal_ids, outcomes):
if isinstance(outcome, Exception):
results[terminal_id] = {"success": False, "error": str(outcome)}
print(f"{terminal_id}: 发送失败 - {outcome}")
else:
results[terminal_id] = {"success": True}
print(f"{terminal_id}: 发送成功")
return results
# 使用
signal = Signal("sensor.data")
signal.get_payload().set_number("value", 25.5)
terminal_ids = ["terminal-1", "terminal-2", "terminal-3"]
results = await send_to_multiple_terminals(sdk, terminal_ids, signal)
# 统计
success_count = sum(1 for r in results.values() if r["success"])
total_count = len(results)
print(f"\n发送完成: {success_count}/{total_count} 成功")配置验证
from streamind_sdk import Config
def validate_config(config: Config) -> tuple[bool, str]:
"""验证配置"""
# 检查必需字段
if not config.device_id:
return False, "device_id 不能为空"
if not config.device_type:
return False, "device_type 不能为空"
if not config.endpoint:
return False, "endpoint 不能为空"
if not config.endpoint.startswith("wss://"):
return False, "endpoint 必须使用 wss:// 协议"
if not config.tenant_id:
return False, "tenant_id 不能为空"
if not config.product_id:
return False, "product_id 不能为空"
if not config.product_key:
return False, "product_key 不能为空"
return True, "配置有效"
# 使用
config = Config(...)
is_valid, message = validate_config(config)
if is_valid:
sdk.register_terminal("terminal-1", config)
else:
print(f"配置无效: {message}")数据验证
from streamind_sdk import Signal
class SensorDataValidator:
"""传感器数据验证器"""
@staticmethod
def validate_temperature(value: float) -> tuple[bool, str]:
"""验证温度值"""
if not isinstance(value, (int, float)):
return False, "温度必须是数字"
if not -50 <= value <= 150:
return False, f"温度超出范围: {value}°C"
return True, "有效"
@staticmethod
def create_validated_signal(temperature: float) -> Signal:
"""创建经过验证的温度信号"""
is_valid, message = SensorDataValidator.validate_temperature(temperature)
if not is_valid:
raise ValueError(f"数据验证失败: {message}")
signal = Signal("sensor.temperature")
signal.get_payload().set_number("value", temperature)
signal.get_payload().set_string("unit", "celsius")
return signal
# 使用
try:
signal = SensorDataValidator.create_validated_signal(25.5)
await sdk.send_signal("terminal-1", signal)
except ValueError as e:
print(f"验证错误: {e}")日志记录
import asyncio
import logging
from streamind_sdk import SDK, Config, Signal
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler('streamind_sdk.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
async def main():
try:
sdk = SDK()
config = Config(...)
sdk.register_terminal("terminal-1", config)
logger.info("开始连接到 StreamInd 平台")
await sdk.connect("terminal-1")
logger.info("连接成功")
signal = Signal("sensor.data")
signal.get_payload().set_number("value", 25.5)
logger.info("发送传感器数据")
await sdk.send_signal("terminal-1", signal)
logger.info("数据发送成功")
await asyncio.sleep(60)
logger.info("断开连接")
await sdk.disconnect("terminal-1")
logger.info("已断开连接")
except Exception as e:
logger.error(f"应用程序错误: {e}", exc_info=True)
raise
asyncio.run(main())优雅退出
import asyncio
import signal
from streamind_sdk import SDK, Config
class Application:
def __init__(self):
self.sdk = SDK()
self.running = True
async def start(self):
"""启动应用"""
config = Config(...)
self.sdk.register_terminal("terminal-1", config)
# 连接
await self.sdk.connect("terminal-1")
print("应用已启动")
# 主循环
while self.running:
try:
# 应用逻辑...
await asyncio.sleep(1)
except Exception as e:
print(f"运行时错误: {e}")
# 清理
await self.cleanup()
async def cleanup(self):
"""清理资源"""
print("正在清理资源...")
try:
await self.sdk.disconnect_all()
print("资源清理完成")
except Exception as e:
print(f"清理时出错: {e}")
def stop(self):
"""停止应用"""
print("收到停止信号")
self.running = False
# 使用
async def main():
app = Application()
# 设置信号处理
loop = asyncio.get_event_loop()
def signal_handler():
app.stop()
loop.add_signal_handler(signal.SIGINT, signal_handler)
loop.add_signal_handler(signal.SIGTERM, signal_handler)
try:
await app.start()
except Exception as e:
print(f"应用错误: {e}")
finally:
print("应用已退出")
asyncio.run(main())最佳实践
- 异常捕获 - 捕获所有可能的异常,避免程序崩溃
- 重试机制 - 网络操作使用指数退避重试
- 超时设置 - 为所有异步操作设置合理超时
- 日志记录 - 记录所有关键操作和错误
- 优雅退出 - 确保资源正确释放
- 数据验证 - 发送前验证数据有效性
- 状态监控 - 实时监控连接状态并自动恢复
相关文档
Last updated on