Skip to Content

多终端管理

使用单个 SDK 实例管理多个设备连接,适用于网关、集中控制器等场景。

基础多终端连接

import { SDK, Config, Signal } from 'streamind-sdk'; async function main() { // 创建 SDK 实例 const sdk = new SDK(); // 为不同设备创建配置 const config1 = new Config({ deviceId: "sensor-001", deviceType: "temperature", endpoint: "wss://your-platform.com/signals", tenantId: "your-tenant-id", productId: "product-sensors", productKey: "your-secret-key" }); const config2 = new Config({ deviceId: "sensor-002", deviceType: "humidity", endpoint: "wss://your-platform.com/signals", tenantId: "your-tenant-id", productId: "product-sensors", productKey: "your-secret-key" }); const config3 = new Config({ deviceId: "actuator-001", deviceType: "motor", endpoint: "wss://your-platform.com/signals", tenantId: "your-tenant-id", productId: "product-actuators", productKey: "your-actuators-key" }); // 注册所有终端 sdk.registerTerminal("sensor-1", config1); sdk.registerTerminal("sensor-2", config2); sdk.registerTerminal("actuator-1", config3); console.log("注册了 3 个终端"); // 批量连接所有终端 const results = await sdk.connectAll(); // 检查连接结果 for (const [terminalId, success] of Object.entries(results)) { const status = success ? "成功" : "失败"; console.log(`${terminalId}: 连接${status}`); } // 向不同终端发送信号 // 发送温度数据 const signal1 = new Signal("sensor.temperature"); signal1.getPayload().setNumber("value", 25.5); await sdk.sendSignal("sensor-1", signal1); console.log("温度数据已发送"); // 发送湿度数据 const signal2 = new Signal("sensor.humidity"); signal2.getPayload().setNumber("value", 60.0); await sdk.sendSignal("sensor-2", signal2); console.log("湿度数据已发送"); // 控制执行器 const signal3 = new Signal("actuator.control"); signal3.getPayload().setString("action", "start"); await sdk.sendSignal("actuator-1", signal3); console.log("执行器控制信号已发送"); // 保持连接 await new Promise(resolve => setTimeout(resolve, 60000)); // 批量断开所有终端 await sdk.disconnectAll(); console.log("所有终端已断开"); } main().catch(console.error);

为每个终端设置独立回调

import { SDK, Config, Directive } from 'streamind-sdk'; async function main() { const sdk = new SDK(); // 注册终端 sdk.registerTerminal("sensor-1", config1); sdk.registerTerminal("sensor-2", config2); sdk.registerTerminal("actuator-1", config3); // 为每个终端设置独立的指令回调 function onSensor1Directive(directive: Directive) { console.log(`[传感器1] 收到指令: ${directive.name}`); // 处理传感器1的指令 } function onSensor2Directive(directive: Directive) { console.log(`[传感器2] 收到指令: ${directive.name}`); // 处理传感器2的指令 } function onActuatorDirective(directive: Directive) { console.log(`[执行器] 收到指令: ${directive.name}`); const payload = directive.getPayload(); const action = payload.getString("action"); // 执行控制动作 } sdk.setDirectiveCallback("sensor-1", onSensor1Directive); sdk.setDirectiveCallback("sensor-2", onSensor2Directive); sdk.setDirectiveCallback("actuator-1", onActuatorDirective); // 为每个终端设置独立的连接状态回调 function onSensor1Connection(status: string, message: string) { console.log(`[传感器1] 连接状态: ${status}`); } function onSensor2Connection(status: string, message: string) { console.log(`[传感器2] 连接状态: ${status}`); } function onActuatorConnection(status: string, message: string) { console.log(`[执行器] 连接状态: ${status}`); } sdk.setConnectionCallback("sensor-1", onSensor1Connection); sdk.setConnectionCallback("sensor-2", onSensor2Connection); sdk.setConnectionCallback("actuator-1", onActuatorConnection); // 连接所有终端 await sdk.connectAll(); // 保持运行 await new Promise(resolve => setTimeout(resolve, 3600000)); await sdk.disconnectAll(); } main().catch(console.error);

动态终端管理

运行时动态添加和移除终端:

import { SDK, Config, Signal } from 'streamind-sdk'; class TerminalManager { private sdk: SDK; private terminals: Map<string, Config>; constructor() { this.sdk = new SDK(); this.terminals = new Map(); } async addTerminal(terminalId: string, config: Config): Promise<boolean> { // 动态添加终端 try { // 注册终端 this.sdk.registerTerminal(terminalId, config); // 设置回调 this.sdk.setDirectiveCallback(terminalId, (directive) => { console.log(`[${terminalId}] 收到指令: ${directive.name}`); }); // 连接 await this.sdk.connect(terminalId); // 记录终端 this.terminals.set(terminalId, config); console.log(`终端 ${terminalId} 已添加并连接`); return true; } catch (error) { console.error(`添加终端 ${terminalId} 失败: ${error}`); return false; } } async removeTerminal(terminalId: string): Promise<boolean> { // 动态移除终端 try { if (this.terminals.has(terminalId)) { // 断开连接 await this.sdk.disconnect(terminalId); // 移除记录 this.terminals.delete(terminalId); console.log(`终端 ${terminalId} 已移除`); return true; } else { console.log(`终端 ${terminalId} 不存在`); return false; } } catch (error) { console.error(`移除终端 ${terminalId} 失败: ${error}`); return false; } } async sendToTerminal(terminalId: string, signal: Signal): Promise<boolean> { // 向指定终端发送信号 try { await this.sdk.sendSignal(terminalId, signal); return true; } catch (error) { console.error(`发送失败: ${error}`); return false; } } listTerminals(): string[] { // 列出所有终端 return Array.from(this.terminals.keys()); } async closeAll(): Promise<void> { // 关闭所有终端 await this.sdk.disconnectAll(); this.terminals.clear(); } } // 使用示例 async function main() { const manager = new TerminalManager(); // 动态添加终端 const config1 = new Config({ // ... 配置参数 }); await manager.addTerminal("sensor-1", config1); const config2 = new Config({ // ... 配置参数 }); await manager.addTerminal("sensor-2", config2); // 列出终端 console.log(`当前终端: ${manager.listTerminals().join(', ')}`); // 发送数据 const signal = new Signal("sensor.data"); signal.getPayload().setNumber("value", 25.5); await manager.sendToTerminal("sensor-1", signal); // 移除终端 await manager.removeTerminal("sensor-2"); // 清理 await manager.closeAll(); } main().catch(console.error);

网关场景

作为网关管理多个下游设备:

import { SDK, Config, Signal, Directive } from 'streamind-sdk'; class Gateway { private sdk: SDK; private devices: Map<string, { config: Config; connected: boolean }>; constructor() { this.sdk = new SDK(); this.devices = new Map(); } async registerDevice(deviceId: string, deviceConfig: Config): Promise<void> { // 注册设备 this.sdk.registerTerminal(deviceId, deviceConfig); this.devices.set(deviceId, { config: deviceConfig, connected: false }); // 设置回调 this.sdk.setDirectiveCallback(deviceId, (directive) => { this.handleDeviceDirective(deviceId, directive); }); this.sdk.setConnectionCallback(deviceId, (status, message) => { const device = this.devices.get(deviceId); if (device) { device.connected = (status === "connected"); } console.log(`设备 ${deviceId} 连接状态: ${status}`); }); } handleDeviceDirective(deviceId: string, directive: Directive): void { // 处理设备指令 console.log(`设备 ${deviceId} 收到指令: ${directive.name}`); // 根据指令类型分发处理 if (directive.name === "device.query") { this.respondToQuery(deviceId, directive); } else if (directive.name === "device.control") { this.controlDevice(deviceId, directive); } } async respondToQuery(deviceId: string, directive: Directive): Promise<void> { // 响应查询指令 const signal = new Signal("device.status"); const device = this.devices.get(deviceId); signal.getPayload().setString("status", "online"); signal.getPayload().setBoolean("connected", device?.connected || false); await this.sdk.sendSignal(deviceId, signal); } async controlDevice(deviceId: string, directive: Directive): Promise<void> { // 控制设备 const payload = directive.getPayload(); const action = payload.getString("action"); console.log(`执行设备 ${deviceId} 的动作: ${action}`); // 执行控制逻辑... } async start(): Promise<void> { // 启动网关 const results = await this.sdk.connectAll(); for (const [deviceId, success] of Object.entries(results)) { const status = success ? "成功" : "失败"; console.log(`设备 ${deviceId} 连接${status}`); } } async broadcastSignal(signal: Signal): Promise<void> { // 广播信号到所有设备 const tasks = Array.from(this.devices.keys()).map(deviceId => this.sdk.sendSignal(deviceId, signal).catch(error => { console.error(`广播到 ${deviceId} 失败: ${error}`); }) ); await Promise.all(tasks); } async stop(): Promise<void> { // 停止网关 await this.sdk.disconnectAll(); } } // 使用网关 async function main() { const gateway = new Gateway(); // 注册设备 const device1Config = new Config({ // ... 配置参数 }); await gateway.registerDevice("device-1", device1Config); const device2Config = new Config({ // ... 配置参数 }); await gateway.registerDevice("device-2", device2Config); // 启动网关 await gateway.start(); // 广播信号 const broadcastSignal = new Signal("gateway.announcement"); broadcastSignal.getPayload().setString("message", "Gateway online"); await gateway.broadcastSignal(broadcastSignal); // 保持运行 await new Promise(resolve => setTimeout(resolve, 3600000)); // 停止网关 await gateway.stop(); } main().catch(console.error);

并发发送数据

同时向多个终端发送数据:

import { SDK, Config, Signal } from 'streamind-sdk'; async function sendToAllSensors( sdk: SDK, sensorIds: string[], data: { value: number; timestamp: string } ): Promise<void> { // 并发发送数据到所有传感器 const tasks = sensorIds.map(async (sensorId) => { const signal = new Signal("sensor.data"); const payload = signal.getPayload(); payload.setNumber("value", data.value); payload.setString("timestamp", data.timestamp); try { await sdk.sendSignal(sensorId, signal); console.log(`${sensorId}: 发送成功`); } catch (error) { console.error(`${sensorId}: 发送失败 - ${error}`); } }); await Promise.all(tasks); } async function main() { const sdk = new SDK(); // 注册多个传感器 const sensorIds = ["sensor-1", "sensor-2", "sensor-3", "sensor-4"]; for (const sensorId of sensorIds) { const config = new Config({ deviceId: sensorId, // ... 其他配置参数 }); sdk.registerTerminal(sensorId, config); } // 连接所有传感器 await sdk.connectAll(); // 并发发送数据 const data = { value: 25.5, timestamp: new Date().toISOString() }; await sendToAllSensors(sdk, sensorIds, data); await sdk.disconnectAll(); } main().catch(console.error);

终端状态监控

监控所有终端的连接状态:

import { SDK, Config } from 'streamind-sdk'; interface TerminalStatus { connected: boolean; lastError: string | null; } class TerminalMonitor { private sdk: SDK; private terminalStatus: Map<string, TerminalStatus>; constructor() { this.sdk = new SDK(); this.terminalStatus = new Map(); } registerWithMonitoring(terminalId: string, config: Config): void { // 注册终端并启用监控 this.sdk.registerTerminal(terminalId, config); this.terminalStatus.set(terminalId, { connected: false, lastError: null }); this.sdk.setConnectionCallback(terminalId, (status, message) => { const termStatus = this.terminalStatus.get(terminalId); if (termStatus) { termStatus.connected = (status === "connected"); if (status === "error") { termStatus.lastError = message; } } console.log(`[监控] ${terminalId} 状态: ${status}`); }); } getStatusSummary(): { total: number; connected: number; disconnected: number; details: Map<string, TerminalStatus>; } { // 获取所有终端状态摘要 const connectedCount = Array.from(this.terminalStatus.values()) .filter(s => s.connected).length; const total = this.terminalStatus.size; return { total, connected: connectedCount, disconnected: total - connectedCount, details: this.terminalStatus }; } async checkAndReconnect(): Promise<void> { // 检查并重连断开的终端 for (const [terminalId, status] of this.terminalStatus.entries()) { if (!status.connected) { try { console.log(`尝试重连 ${terminalId}...`); await this.sdk.connect(terminalId); } catch (error) { console.error(`重连 ${terminalId} 失败: ${error}`); } } } } getSDK(): SDK { return this.sdk; } } // 使用监控 async function main() { const monitor = new TerminalMonitor(); // 注册终端 const config1 = new Config({ // ... 配置参数 }); monitor.registerWithMonitoring("sensor-1", config1); const config2 = new Config({ // ... 配置参数 }); monitor.registerWithMonitoring("sensor-2", config2); // 连接 await monitor.getSDK().connectAll(); // 定期检查状态 const intervalId = setInterval(async () => { const summary = monitor.getStatusSummary(); console.log(`\n状态摘要: ${summary.connected}/${summary.total} 已连接`); // 尝试重连 await monitor.checkAndReconnect(); }, 30000); // 每30秒检查一次 // 运行一段时间后清理 await new Promise(resolve => setTimeout(resolve, 300000)); clearInterval(intervalId); } main().catch(console.error);

最佳实践

  1. 资源管理 - 使用单个 SDK 实例管理所有终端
  2. 错误隔离 - 单个终端的错误不应影响其他终端
  3. 并发控制 - 使用 Promise.all 并发操作多个终端
  4. 状态监控 - 实时监控所有终端的连接状态
  5. 优雅关闭 - 确保所有终端正确断开连接

下一步

Last updated on