Skip to Content

多终端管理

使用单个 SDK 实例管理多个设备连接,适用于网关、集中控制器等场景。

基础多终端连接

import asyncio from streamind_sdk import SDK, Config, Signal async def main(): # 创建 SDK 实例 sdk = SDK() # 为不同设备创建配置 config1 = Config( device_id="sensor-001", device_type="temperature", endpoint="wss://your-platform.com/signals", tenant_id="your-tenant-id", product_id="product-sensors", product_key="your-secret-key" ) config2 = Config( device_id="sensor-002", device_type="humidity", endpoint="wss://your-platform.com/signals", tenant_id="your-tenant-id", product_id="product-sensors", product_key="your-secret-key" ) config3 = Config( device_id="actuator-001", device_type="motor", endpoint="wss://your-platform.com/signals", tenant_id="your-tenant-id", product_id="product-actuators", product_key="your-actuators-key" ) # 注册所有终端 sdk.register_terminal("sensor-1", config1) sdk.register_terminal("sensor-2", config2) sdk.register_terminal("actuator-1", config3) print("注册了 3 个终端") # 批量连接所有终端 results = await sdk.connect_all() # 检查连接结果 for terminal_id, success in results.items(): status = "成功" if success else "失败" print(f"{terminal_id}: 连接{status}") # 向不同终端发送信号 # 发送温度数据 signal1 = Signal("sensor.temperature") signal1.get_payload().set_number("value", 25.5) await sdk.send_signal("sensor-1", signal1) print("温度数据已发送") # 发送湿度数据 signal2 = Signal("sensor.humidity") signal2.get_payload().set_number("value", 60.0) await sdk.send_signal("sensor-2", signal2) print("湿度数据已发送") # 控制执行器 signal3 = Signal("actuator.control") signal3.get_payload().set_string("action", "start") await sdk.send_signal("actuator-1", signal3) print("执行器控制信号已发送") # 保持连接 await asyncio.sleep(60) # 批量断开所有终端 await sdk.disconnect_all() print("所有终端已断开") asyncio.run(main())

为每个终端设置独立回调

import asyncio from streamind_sdk import SDK, Config async def main(): sdk = SDK() # 注册终端 sdk.register_terminal("sensor-1", config1) sdk.register_terminal("sensor-2", config2) sdk.register_terminal("actuator-1", config3) # 为每个终端设置独立的指令回调 def on_sensor1_directive(directive): print(f"[传感器1] 收到指令: {directive.name}") # 处理传感器1的指令 def on_sensor2_directive(directive): print(f"[传感器2] 收到指令: {directive.name}") # 处理传感器2的指令 def on_actuator_directive(directive): print(f"[执行器] 收到指令: {directive.name}") payload = directive.get_payload() action = payload.get_string("action") # 执行控制动作 sdk.set_directive_callback("sensor-1", on_sensor1_directive) sdk.set_directive_callback("sensor-2", on_sensor2_directive) sdk.set_directive_callback("actuator-1", on_actuator_directive) # 为每个终端设置独立的连接状态回调 def on_sensor1_connection(status, message): print(f"[传感器1] 连接状态: {status}") def on_sensor2_connection(status, message): print(f"[传感器2] 连接状态: {status}") def on_actuator_connection(status, message): print(f"[执行器] 连接状态: {status}") sdk.set_connection_callback("sensor-1", on_sensor1_connection) sdk.set_connection_callback("sensor-2", on_sensor2_connection) sdk.set_connection_callback("actuator-1", on_actuator_connection) # 连接所有终端 await sdk.connect_all() # 保持运行 await asyncio.sleep(3600) await sdk.disconnect_all() asyncio.run(main())

动态终端管理

运行时动态添加和移除终端:

import asyncio from streamind_sdk import SDK, Config class TerminalManager: def __init__(self): self.sdk = SDK() self.terminals = {} async def add_terminal(self, terminal_id, config): """动态添加终端""" try: # 注册终端 self.sdk.register_terminal(terminal_id, config) # 设置回调 def on_directive(directive): print(f"[{terminal_id}] 收到指令: {directive.name}") self.sdk.set_directive_callback(terminal_id, on_directive) # 连接 await self.sdk.connect(terminal_id) # 记录终端 self.terminals[terminal_id] = config print(f"终端 {terminal_id} 已添加并连接") return True except Exception as e: print(f"添加终端 {terminal_id} 失败: {e}") return False async def remove_terminal(self, terminal_id): """动态移除终端""" try: if terminal_id in self.terminals: # 断开连接 await self.sdk.disconnect(terminal_id) # 移除记录 del self.terminals[terminal_id] print(f"终端 {terminal_id} 已移除") return True else: print(f"终端 {terminal_id} 不存在") return False except Exception as e: print(f"移除终端 {terminal_id} 失败: {e}") return False async def send_to_terminal(self, terminal_id, signal): """向指定终端发送信号""" try: await self.sdk.send_signal(terminal_id, signal) return True except Exception as e: print(f"发送失败: {e}") return False def list_terminals(self): """列出所有终端""" return list(self.terminals.keys()) async def close_all(self): """关闭所有终端""" await self.sdk.disconnect_all() self.terminals.clear() # 使用示例 async def main(): manager = TerminalManager() # 动态添加终端 config1 = Config(...) await manager.add_terminal("sensor-1", config1) config2 = Config(...) await manager.add_terminal("sensor-2", config2) # 列出终端 print(f"当前终端: {manager.list_terminals()}") # 发送数据 from streamind_sdk import Signal signal = Signal("sensor.data") signal.get_payload().set_number("value", 25.5) await manager.send_to_terminal("sensor-1", signal) # 移除终端 await manager.remove_terminal("sensor-2") # 清理 await manager.close_all() asyncio.run(main())

网关场景

作为网关管理多个下游设备:

import asyncio from streamind_sdk import SDK, Config, Signal class Gateway: def __init__(self, gateway_config): self.sdk = SDK() self.devices = {} async def register_device(self, device_id, device_config): """注册设备""" self.sdk.register_terminal(device_id, device_config) self.devices[device_id] = { "config": device_config, "connected": False } # 设置回调 def on_directive(directive): self.handle_device_directive(device_id, directive) def on_connection(status, message): self.devices[device_id]["connected"] = (status == "connected") print(f"设备 {device_id} 连接状态: {status}") self.sdk.set_directive_callback(device_id, on_directive) self.sdk.set_connection_callback(device_id, on_connection) def handle_device_directive(self, device_id, directive): """处理设备指令""" print(f"设备 {device_id} 收到指令: {directive.name}") # 根据指令类型分发处理 if directive.name == "device.query": asyncio.create_task(self.respond_to_query(device_id, directive)) elif directive.name == "device.control": asyncio.create_task(self.control_device(device_id, directive)) async def respond_to_query(self, device_id, directive): """响应查询指令""" signal = Signal("device.status") signal.get_payload().set_string("status", "online") signal.get_payload().set_boolean("connected", self.devices[device_id]["connected"]) await self.sdk.send_signal(device_id, signal) async def control_device(self, device_id, directive): """控制设备""" payload = directive.get_payload() action = payload.get_string("action") print(f"执行设备 {device_id} 的动作: {action}") # 执行控制逻辑... async def start(self): """启动网关""" # 连接所有设备 results = await self.sdk.connect_all() for device_id, success in results.items(): status = "成功" if success else "失败" print(f"设备 {device_id} 连接{status}") async def broadcast_signal(self, signal): """广播信号到所有设备""" tasks = [] for device_id in self.devices.keys(): task = self.sdk.send_signal(device_id, signal) tasks.append(task) await asyncio.gather(*tasks, return_exceptions=True) async def stop(self): """停止网关""" await self.sdk.disconnect_all() # 使用网关 async def main(): # 创建网关配置 gateway = Gateway(gateway_config=None) # 注册设备 device1_config = Config(...) await gateway.register_device("device-1", device1_config) device2_config = Config(...) await gateway.register_device("device-2", device2_config) # 启动网关 await gateway.start() # 广播信号 broadcast_signal = Signal("gateway.announcement") broadcast_signal.get_payload().set_string("message", "Gateway online") await gateway.broadcast_signal(broadcast_signal) # 保持运行 await asyncio.sleep(3600) # 停止网关 await gateway.stop() asyncio.run(main())

并发发送数据

同时向多个终端发送数据:

import asyncio from streamind_sdk import SDK, Config, Signal async def send_to_all_sensors(sdk, sensor_ids, data): """并发发送数据到所有传感器""" tasks = [] for sensor_id in sensor_ids: signal = Signal("sensor.data") payload = signal.get_payload() payload.set_number("value", data["value"]) payload.set_string("timestamp", data["timestamp"]) # 创建发送任务 task = sdk.send_signal(sensor_id, signal) tasks.append(task) # 并发执行所有发送任务 results = await asyncio.gather(*tasks, return_exceptions=True) # 检查结果 for i, result in enumerate(results): sensor_id = sensor_ids[i] if isinstance(result, Exception): print(f"{sensor_id}: 发送失败 - {result}") else: print(f"{sensor_id}: 发送成功") async def main(): sdk = SDK() # 注册多个传感器 sensor_ids = ["sensor-1", "sensor-2", "sensor-3", "sensor-4"] for sensor_id in sensor_ids: config = Config(device_id=sensor_id, ...) sdk.register_terminal(sensor_id, config) # 连接所有传感器 await sdk.connect_all() # 并发发送数据 data = { "value": 25.5, "timestamp": "2024-01-01T00:00:00Z" } await send_to_all_sensors(sdk, sensor_ids, data) await sdk.disconnect_all() asyncio.run(main())

终端状态监控

监控所有终端的连接状态:

import asyncio from streamind_sdk import SDK, Config class TerminalMonitor: def __init__(self): self.sdk = SDK() self.terminal_status = {} def register_with_monitoring(self, terminal_id, config): """注册终端并启用监控""" self.sdk.register_terminal(terminal_id, config) self.terminal_status[terminal_id] = { "connected": False, "last_error": None } def on_connection(status, message): self.terminal_status[terminal_id]["connected"] = (status == "connected") if status == "error": self.terminal_status[terminal_id]["last_error"] = message print(f"[监控] {terminal_id} 状态: {status}") self.sdk.set_connection_callback(terminal_id, on_connection) def get_status_summary(self): """获取所有终端状态摘要""" connected_count = sum(1 for s in self.terminal_status.values() if s["connected"]) total_count = len(self.terminal_status) return { "total": total_count, "connected": connected_count, "disconnected": total_count - connected_count, "details": self.terminal_status } async def check_and_reconnect(self): """检查并重连断开的终端""" for terminal_id, status in self.terminal_status.items(): if not status["connected"]: try: print(f"尝试重连 {terminal_id}...") await self.sdk.connect(terminal_id) except Exception as e: print(f"重连 {terminal_id} 失败: {e}") # 使用监控 async def main(): monitor = TerminalMonitor() # 注册终端 config1 = Config(...) monitor.register_with_monitoring("sensor-1", config1) config2 = Config(...) monitor.register_with_monitoring("sensor-2", config2) # 连接 await monitor.sdk.connect_all() # 定期检查状态 while True: await asyncio.sleep(30) # 每30秒检查一次 summary = monitor.get_status_summary() print(f"\n状态摘要: {summary['connected']}/{summary['total']} 已连接") # 尝试重连 await monitor.check_and_reconnect() asyncio.run(main())

最佳实践

  1. 资源管理 - 使用单个 SDK 实例管理所有终端
  2. 错误隔离 - 单个终端的错误不应影响其他终端
  3. 并发控制 - 使用 asyncio.gather 并发操作多个终端
  4. 状态监控 - 实时监控所有终端的连接状态
  5. 优雅关闭 - 确保所有终端正确断开连接

下一步

Last updated on