多终端管理
使用单个 SDK 实例管理多个设备连接,适用于网关、集中控制器等场景。
基础多终端连接
import asyncio
from streamind_sdk import SDK, Config, Signal
async def main():
# 创建 SDK 实例
sdk = SDK()
# 为不同设备创建配置
config1 = Config(
device_id="sensor-001",
device_type="temperature",
endpoint="wss://your-platform.com/signals",
tenant_id="your-tenant-id",
product_id="product-sensors",
product_key="your-secret-key"
)
config2 = Config(
device_id="sensor-002",
device_type="humidity",
endpoint="wss://your-platform.com/signals",
tenant_id="your-tenant-id",
product_id="product-sensors",
product_key="your-secret-key"
)
config3 = Config(
device_id="actuator-001",
device_type="motor",
endpoint="wss://your-platform.com/signals",
tenant_id="your-tenant-id",
product_id="product-actuators",
product_key="your-actuators-key"
)
# 注册所有终端
sdk.register_terminal("sensor-1", config1)
sdk.register_terminal("sensor-2", config2)
sdk.register_terminal("actuator-1", config3)
print("注册了 3 个终端")
# 批量连接所有终端
results = await sdk.connect_all()
# 检查连接结果
for terminal_id, success in results.items():
status = "成功" if success else "失败"
print(f"{terminal_id}: 连接{status}")
# 向不同终端发送信号
# 发送温度数据
signal1 = Signal("sensor.temperature")
signal1.get_payload().set_number("value", 25.5)
await sdk.send_signal("sensor-1", signal1)
print("温度数据已发送")
# 发送湿度数据
signal2 = Signal("sensor.humidity")
signal2.get_payload().set_number("value", 60.0)
await sdk.send_signal("sensor-2", signal2)
print("湿度数据已发送")
# 控制执行器
signal3 = Signal("actuator.control")
signal3.get_payload().set_string("action", "start")
await sdk.send_signal("actuator-1", signal3)
print("执行器控制信号已发送")
# 保持连接
await asyncio.sleep(60)
# 批量断开所有终端
await sdk.disconnect_all()
print("所有终端已断开")
asyncio.run(main())为每个终端设置独立回调
import asyncio
from streamind_sdk import SDK, Config
async def main():
sdk = SDK()
# 注册终端
sdk.register_terminal("sensor-1", config1)
sdk.register_terminal("sensor-2", config2)
sdk.register_terminal("actuator-1", config3)
# 为每个终端设置独立的指令回调
def on_sensor1_directive(directive):
print(f"[传感器1] 收到指令: {directive.name}")
# 处理传感器1的指令
def on_sensor2_directive(directive):
print(f"[传感器2] 收到指令: {directive.name}")
# 处理传感器2的指令
def on_actuator_directive(directive):
print(f"[执行器] 收到指令: {directive.name}")
payload = directive.get_payload()
action = payload.get_string("action")
# 执行控制动作
sdk.set_directive_callback("sensor-1", on_sensor1_directive)
sdk.set_directive_callback("sensor-2", on_sensor2_directive)
sdk.set_directive_callback("actuator-1", on_actuator_directive)
# 为每个终端设置独立的连接状态回调
def on_sensor1_connection(status, message):
print(f"[传感器1] 连接状态: {status}")
def on_sensor2_connection(status, message):
print(f"[传感器2] 连接状态: {status}")
def on_actuator_connection(status, message):
print(f"[执行器] 连接状态: {status}")
sdk.set_connection_callback("sensor-1", on_sensor1_connection)
sdk.set_connection_callback("sensor-2", on_sensor2_connection)
sdk.set_connection_callback("actuator-1", on_actuator_connection)
# 连接所有终端
await sdk.connect_all()
# 保持运行
await asyncio.sleep(3600)
await sdk.disconnect_all()
asyncio.run(main())动态终端管理
运行时动态添加和移除终端:
import asyncio
from streamind_sdk import SDK, Config
class TerminalManager:
def __init__(self):
self.sdk = SDK()
self.terminals = {}
async def add_terminal(self, terminal_id, config):
"""动态添加终端"""
try:
# 注册终端
self.sdk.register_terminal(terminal_id, config)
# 设置回调
def on_directive(directive):
print(f"[{terminal_id}] 收到指令: {directive.name}")
self.sdk.set_directive_callback(terminal_id, on_directive)
# 连接
await self.sdk.connect(terminal_id)
# 记录终端
self.terminals[terminal_id] = config
print(f"终端 {terminal_id} 已添加并连接")
return True
except Exception as e:
print(f"添加终端 {terminal_id} 失败: {e}")
return False
async def remove_terminal(self, terminal_id):
"""动态移除终端"""
try:
if terminal_id in self.terminals:
# 断开连接
await self.sdk.disconnect(terminal_id)
# 移除记录
del self.terminals[terminal_id]
print(f"终端 {terminal_id} 已移除")
return True
else:
print(f"终端 {terminal_id} 不存在")
return False
except Exception as e:
print(f"移除终端 {terminal_id} 失败: {e}")
return False
async def send_to_terminal(self, terminal_id, signal):
"""向指定终端发送信号"""
try:
await self.sdk.send_signal(terminal_id, signal)
return True
except Exception as e:
print(f"发送失败: {e}")
return False
def list_terminals(self):
"""列出所有终端"""
return list(self.terminals.keys())
async def close_all(self):
"""关闭所有终端"""
await self.sdk.disconnect_all()
self.terminals.clear()
# 使用示例
async def main():
manager = TerminalManager()
# 动态添加终端
config1 = Config(...)
await manager.add_terminal("sensor-1", config1)
config2 = Config(...)
await manager.add_terminal("sensor-2", config2)
# 列出终端
print(f"当前终端: {manager.list_terminals()}")
# 发送数据
from streamind_sdk import Signal
signal = Signal("sensor.data")
signal.get_payload().set_number("value", 25.5)
await manager.send_to_terminal("sensor-1", signal)
# 移除终端
await manager.remove_terminal("sensor-2")
# 清理
await manager.close_all()
asyncio.run(main())网关场景
作为网关管理多个下游设备:
import asyncio
from streamind_sdk import SDK, Config, Signal
class Gateway:
def __init__(self, gateway_config):
self.sdk = SDK()
self.devices = {}
async def register_device(self, device_id, device_config):
"""注册设备"""
self.sdk.register_terminal(device_id, device_config)
self.devices[device_id] = {
"config": device_config,
"connected": False
}
# 设置回调
def on_directive(directive):
self.handle_device_directive(device_id, directive)
def on_connection(status, message):
self.devices[device_id]["connected"] = (status == "connected")
print(f"设备 {device_id} 连接状态: {status}")
self.sdk.set_directive_callback(device_id, on_directive)
self.sdk.set_connection_callback(device_id, on_connection)
def handle_device_directive(self, device_id, directive):
"""处理设备指令"""
print(f"设备 {device_id} 收到指令: {directive.name}")
# 根据指令类型分发处理
if directive.name == "device.query":
asyncio.create_task(self.respond_to_query(device_id, directive))
elif directive.name == "device.control":
asyncio.create_task(self.control_device(device_id, directive))
async def respond_to_query(self, device_id, directive):
"""响应查询指令"""
signal = Signal("device.status")
signal.get_payload().set_string("status", "online")
signal.get_payload().set_boolean("connected",
self.devices[device_id]["connected"])
await self.sdk.send_signal(device_id, signal)
async def control_device(self, device_id, directive):
"""控制设备"""
payload = directive.get_payload()
action = payload.get_string("action")
print(f"执行设备 {device_id} 的动作: {action}")
# 执行控制逻辑...
async def start(self):
"""启动网关"""
# 连接所有设备
results = await self.sdk.connect_all()
for device_id, success in results.items():
status = "成功" if success else "失败"
print(f"设备 {device_id} 连接{status}")
async def broadcast_signal(self, signal):
"""广播信号到所有设备"""
tasks = []
for device_id in self.devices.keys():
task = self.sdk.send_signal(device_id, signal)
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
async def stop(self):
"""停止网关"""
await self.sdk.disconnect_all()
# 使用网关
async def main():
# 创建网关配置
gateway = Gateway(gateway_config=None)
# 注册设备
device1_config = Config(...)
await gateway.register_device("device-1", device1_config)
device2_config = Config(...)
await gateway.register_device("device-2", device2_config)
# 启动网关
await gateway.start()
# 广播信号
broadcast_signal = Signal("gateway.announcement")
broadcast_signal.get_payload().set_string("message", "Gateway online")
await gateway.broadcast_signal(broadcast_signal)
# 保持运行
await asyncio.sleep(3600)
# 停止网关
await gateway.stop()
asyncio.run(main())并发发送数据
同时向多个终端发送数据:
import asyncio
from streamind_sdk import SDK, Config, Signal
async def send_to_all_sensors(sdk, sensor_ids, data):
"""并发发送数据到所有传感器"""
tasks = []
for sensor_id in sensor_ids:
signal = Signal("sensor.data")
payload = signal.get_payload()
payload.set_number("value", data["value"])
payload.set_string("timestamp", data["timestamp"])
# 创建发送任务
task = sdk.send_signal(sensor_id, signal)
tasks.append(task)
# 并发执行所有发送任务
results = await asyncio.gather(*tasks, return_exceptions=True)
# 检查结果
for i, result in enumerate(results):
sensor_id = sensor_ids[i]
if isinstance(result, Exception):
print(f"{sensor_id}: 发送失败 - {result}")
else:
print(f"{sensor_id}: 发送成功")
async def main():
sdk = SDK()
# 注册多个传感器
sensor_ids = ["sensor-1", "sensor-2", "sensor-3", "sensor-4"]
for sensor_id in sensor_ids:
config = Config(device_id=sensor_id, ...)
sdk.register_terminal(sensor_id, config)
# 连接所有传感器
await sdk.connect_all()
# 并发发送数据
data = {
"value": 25.5,
"timestamp": "2024-01-01T00:00:00Z"
}
await send_to_all_sensors(sdk, sensor_ids, data)
await sdk.disconnect_all()
asyncio.run(main())终端状态监控
监控所有终端的连接状态:
import asyncio
from streamind_sdk import SDK, Config
class TerminalMonitor:
def __init__(self):
self.sdk = SDK()
self.terminal_status = {}
def register_with_monitoring(self, terminal_id, config):
"""注册终端并启用监控"""
self.sdk.register_terminal(terminal_id, config)
self.terminal_status[terminal_id] = {
"connected": False,
"last_error": None
}
def on_connection(status, message):
self.terminal_status[terminal_id]["connected"] = (status == "connected")
if status == "error":
self.terminal_status[terminal_id]["last_error"] = message
print(f"[监控] {terminal_id} 状态: {status}")
self.sdk.set_connection_callback(terminal_id, on_connection)
def get_status_summary(self):
"""获取所有终端状态摘要"""
connected_count = sum(1 for s in self.terminal_status.values()
if s["connected"])
total_count = len(self.terminal_status)
return {
"total": total_count,
"connected": connected_count,
"disconnected": total_count - connected_count,
"details": self.terminal_status
}
async def check_and_reconnect(self):
"""检查并重连断开的终端"""
for terminal_id, status in self.terminal_status.items():
if not status["connected"]:
try:
print(f"尝试重连 {terminal_id}...")
await self.sdk.connect(terminal_id)
except Exception as e:
print(f"重连 {terminal_id} 失败: {e}")
# 使用监控
async def main():
monitor = TerminalMonitor()
# 注册终端
config1 = Config(...)
monitor.register_with_monitoring("sensor-1", config1)
config2 = Config(...)
monitor.register_with_monitoring("sensor-2", config2)
# 连接
await monitor.sdk.connect_all()
# 定期检查状态
while True:
await asyncio.sleep(30) # 每30秒检查一次
summary = monitor.get_status_summary()
print(f"\n状态摘要: {summary['connected']}/{summary['total']} 已连接")
# 尝试重连
await monitor.check_and_reconnect()
asyncio.run(main())最佳实践
- 资源管理 - 使用单个 SDK 实例管理所有终端
- 错误隔离 - 单个终端的错误不应影响其他终端
- 并发控制 - 使用 asyncio.gather 并发操作多个终端
- 状态监控 - 实时监控所有终端的连接状态
- 优雅关闭 - 确保所有终端正确断开连接
下一步
Last updated on