多终端管理
使用单个 SDK 实例管理多个设备连接,适用于网关、集中控制器等场景。
基础多终端连接
import { SDK, Config, Signal } from 'streamind-sdk';
async function main() {
// 创建 SDK 实例
const sdk = new SDK();
// 为不同设备创建配置
const config1 = new Config({
deviceId: "sensor-001",
deviceType: "temperature",
endpoint: "wss://your-platform.com/signals",
tenantId: "your-tenant-id",
productId: "product-sensors",
productKey: "your-secret-key"
});
const config2 = new Config({
deviceId: "sensor-002",
deviceType: "humidity",
endpoint: "wss://your-platform.com/signals",
tenantId: "your-tenant-id",
productId: "product-sensors",
productKey: "your-secret-key"
});
const config3 = new Config({
deviceId: "actuator-001",
deviceType: "motor",
endpoint: "wss://your-platform.com/signals",
tenantId: "your-tenant-id",
productId: "product-actuators",
productKey: "your-actuators-key"
});
// 注册所有终端
sdk.registerTerminal("sensor-1", config1);
sdk.registerTerminal("sensor-2", config2);
sdk.registerTerminal("actuator-1", config3);
console.log("注册了 3 个终端");
// 批量连接所有终端
const results = await sdk.connectAll();
// 检查连接结果
for (const [terminalId, success] of Object.entries(results)) {
const status = success ? "成功" : "失败";
console.log(`${terminalId}: 连接${status}`);
}
// 向不同终端发送信号
// 发送温度数据
const signal1 = new Signal("sensor.temperature");
signal1.getPayload().setNumber("value", 25.5);
await sdk.sendSignal("sensor-1", signal1);
console.log("温度数据已发送");
// 发送湿度数据
const signal2 = new Signal("sensor.humidity");
signal2.getPayload().setNumber("value", 60.0);
await sdk.sendSignal("sensor-2", signal2);
console.log("湿度数据已发送");
// 控制执行器
const signal3 = new Signal("actuator.control");
signal3.getPayload().setString("action", "start");
await sdk.sendSignal("actuator-1", signal3);
console.log("执行器控制信号已发送");
// 保持连接
await new Promise(resolve => setTimeout(resolve, 60000));
// 批量断开所有终端
await sdk.disconnectAll();
console.log("所有终端已断开");
}
main().catch(console.error);为每个终端设置独立回调
import { SDK, Config, Directive } from 'streamind-sdk';
async function main() {
const sdk = new SDK();
// 注册终端
sdk.registerTerminal("sensor-1", config1);
sdk.registerTerminal("sensor-2", config2);
sdk.registerTerminal("actuator-1", config3);
// 为每个终端设置独立的指令回调
function onSensor1Directive(directive: Directive) {
console.log(`[传感器1] 收到指令: ${directive.name}`);
// 处理传感器1的指令
}
function onSensor2Directive(directive: Directive) {
console.log(`[传感器2] 收到指令: ${directive.name}`);
// 处理传感器2的指令
}
function onActuatorDirective(directive: Directive) {
console.log(`[执行器] 收到指令: ${directive.name}`);
const payload = directive.getPayload();
const action = payload.getString("action");
// 执行控制动作
}
sdk.setDirectiveCallback("sensor-1", onSensor1Directive);
sdk.setDirectiveCallback("sensor-2", onSensor2Directive);
sdk.setDirectiveCallback("actuator-1", onActuatorDirective);
// 为每个终端设置独立的连接状态回调
function onSensor1Connection(status: string, message: string) {
console.log(`[传感器1] 连接状态: ${status}`);
}
function onSensor2Connection(status: string, message: string) {
console.log(`[传感器2] 连接状态: ${status}`);
}
function onActuatorConnection(status: string, message: string) {
console.log(`[执行器] 连接状态: ${status}`);
}
sdk.setConnectionCallback("sensor-1", onSensor1Connection);
sdk.setConnectionCallback("sensor-2", onSensor2Connection);
sdk.setConnectionCallback("actuator-1", onActuatorConnection);
// 连接所有终端
await sdk.connectAll();
// 保持运行
await new Promise(resolve => setTimeout(resolve, 3600000));
await sdk.disconnectAll();
}
main().catch(console.error);动态终端管理
运行时动态添加和移除终端:
import { SDK, Config, Signal } from 'streamind-sdk';
class TerminalManager {
private sdk: SDK;
private terminals: Map<string, Config>;
constructor() {
this.sdk = new SDK();
this.terminals = new Map();
}
async addTerminal(terminalId: string, config: Config): Promise<boolean> {
// 动态添加终端
try {
// 注册终端
this.sdk.registerTerminal(terminalId, config);
// 设置回调
this.sdk.setDirectiveCallback(terminalId, (directive) => {
console.log(`[${terminalId}] 收到指令: ${directive.name}`);
});
// 连接
await this.sdk.connect(terminalId);
// 记录终端
this.terminals.set(terminalId, config);
console.log(`终端 ${terminalId} 已添加并连接`);
return true;
} catch (error) {
console.error(`添加终端 ${terminalId} 失败: ${error}`);
return false;
}
}
async removeTerminal(terminalId: string): Promise<boolean> {
// 动态移除终端
try {
if (this.terminals.has(terminalId)) {
// 断开连接
await this.sdk.disconnect(terminalId);
// 移除记录
this.terminals.delete(terminalId);
console.log(`终端 ${terminalId} 已移除`);
return true;
} else {
console.log(`终端 ${terminalId} 不存在`);
return false;
}
} catch (error) {
console.error(`移除终端 ${terminalId} 失败: ${error}`);
return false;
}
}
async sendToTerminal(terminalId: string, signal: Signal): Promise<boolean> {
// 向指定终端发送信号
try {
await this.sdk.sendSignal(terminalId, signal);
return true;
} catch (error) {
console.error(`发送失败: ${error}`);
return false;
}
}
listTerminals(): string[] {
// 列出所有终端
return Array.from(this.terminals.keys());
}
async closeAll(): Promise<void> {
// 关闭所有终端
await this.sdk.disconnectAll();
this.terminals.clear();
}
}
// 使用示例
async function main() {
const manager = new TerminalManager();
// 动态添加终端
const config1 = new Config({
// ... 配置参数
});
await manager.addTerminal("sensor-1", config1);
const config2 = new Config({
// ... 配置参数
});
await manager.addTerminal("sensor-2", config2);
// 列出终端
console.log(`当前终端: ${manager.listTerminals().join(', ')}`);
// 发送数据
const signal = new Signal("sensor.data");
signal.getPayload().setNumber("value", 25.5);
await manager.sendToTerminal("sensor-1", signal);
// 移除终端
await manager.removeTerminal("sensor-2");
// 清理
await manager.closeAll();
}
main().catch(console.error);网关场景
作为网关管理多个下游设备:
import { SDK, Config, Signal, Directive } from 'streamind-sdk';
class Gateway {
private sdk: SDK;
private devices: Map<string, { config: Config; connected: boolean }>;
constructor() {
this.sdk = new SDK();
this.devices = new Map();
}
async registerDevice(deviceId: string, deviceConfig: Config): Promise<void> {
// 注册设备
this.sdk.registerTerminal(deviceId, deviceConfig);
this.devices.set(deviceId, {
config: deviceConfig,
connected: false
});
// 设置回调
this.sdk.setDirectiveCallback(deviceId, (directive) => {
this.handleDeviceDirective(deviceId, directive);
});
this.sdk.setConnectionCallback(deviceId, (status, message) => {
const device = this.devices.get(deviceId);
if (device) {
device.connected = (status === "connected");
}
console.log(`设备 ${deviceId} 连接状态: ${status}`);
});
}
handleDeviceDirective(deviceId: string, directive: Directive): void {
// 处理设备指令
console.log(`设备 ${deviceId} 收到指令: ${directive.name}`);
// 根据指令类型分发处理
if (directive.name === "device.query") {
this.respondToQuery(deviceId, directive);
} else if (directive.name === "device.control") {
this.controlDevice(deviceId, directive);
}
}
async respondToQuery(deviceId: string, directive: Directive): Promise<void> {
// 响应查询指令
const signal = new Signal("device.status");
const device = this.devices.get(deviceId);
signal.getPayload().setString("status", "online");
signal.getPayload().setBoolean("connected", device?.connected || false);
await this.sdk.sendSignal(deviceId, signal);
}
async controlDevice(deviceId: string, directive: Directive): Promise<void> {
// 控制设备
const payload = directive.getPayload();
const action = payload.getString("action");
console.log(`执行设备 ${deviceId} 的动作: ${action}`);
// 执行控制逻辑...
}
async start(): Promise<void> {
// 启动网关
const results = await this.sdk.connectAll();
for (const [deviceId, success] of Object.entries(results)) {
const status = success ? "成功" : "失败";
console.log(`设备 ${deviceId} 连接${status}`);
}
}
async broadcastSignal(signal: Signal): Promise<void> {
// 广播信号到所有设备
const tasks = Array.from(this.devices.keys()).map(deviceId =>
this.sdk.sendSignal(deviceId, signal).catch(error => {
console.error(`广播到 ${deviceId} 失败: ${error}`);
})
);
await Promise.all(tasks);
}
async stop(): Promise<void> {
// 停止网关
await this.sdk.disconnectAll();
}
}
// 使用网关
async function main() {
const gateway = new Gateway();
// 注册设备
const device1Config = new Config({
// ... 配置参数
});
await gateway.registerDevice("device-1", device1Config);
const device2Config = new Config({
// ... 配置参数
});
await gateway.registerDevice("device-2", device2Config);
// 启动网关
await gateway.start();
// 广播信号
const broadcastSignal = new Signal("gateway.announcement");
broadcastSignal.getPayload().setString("message", "Gateway online");
await gateway.broadcastSignal(broadcastSignal);
// 保持运行
await new Promise(resolve => setTimeout(resolve, 3600000));
// 停止网关
await gateway.stop();
}
main().catch(console.error);并发发送数据
同时向多个终端发送数据:
import { SDK, Config, Signal } from 'streamind-sdk';
async function sendToAllSensors(
sdk: SDK,
sensorIds: string[],
data: { value: number; timestamp: string }
): Promise<void> {
// 并发发送数据到所有传感器
const tasks = sensorIds.map(async (sensorId) => {
const signal = new Signal("sensor.data");
const payload = signal.getPayload();
payload.setNumber("value", data.value);
payload.setString("timestamp", data.timestamp);
try {
await sdk.sendSignal(sensorId, signal);
console.log(`${sensorId}: 发送成功`);
} catch (error) {
console.error(`${sensorId}: 发送失败 - ${error}`);
}
});
await Promise.all(tasks);
}
async function main() {
const sdk = new SDK();
// 注册多个传感器
const sensorIds = ["sensor-1", "sensor-2", "sensor-3", "sensor-4"];
for (const sensorId of sensorIds) {
const config = new Config({
deviceId: sensorId,
// ... 其他配置参数
});
sdk.registerTerminal(sensorId, config);
}
// 连接所有传感器
await sdk.connectAll();
// 并发发送数据
const data = {
value: 25.5,
timestamp: new Date().toISOString()
};
await sendToAllSensors(sdk, sensorIds, data);
await sdk.disconnectAll();
}
main().catch(console.error);终端状态监控
监控所有终端的连接状态:
import { SDK, Config } from 'streamind-sdk';
interface TerminalStatus {
connected: boolean;
lastError: string | null;
}
class TerminalMonitor {
private sdk: SDK;
private terminalStatus: Map<string, TerminalStatus>;
constructor() {
this.sdk = new SDK();
this.terminalStatus = new Map();
}
registerWithMonitoring(terminalId: string, config: Config): void {
// 注册终端并启用监控
this.sdk.registerTerminal(terminalId, config);
this.terminalStatus.set(terminalId, {
connected: false,
lastError: null
});
this.sdk.setConnectionCallback(terminalId, (status, message) => {
const termStatus = this.terminalStatus.get(terminalId);
if (termStatus) {
termStatus.connected = (status === "connected");
if (status === "error") {
termStatus.lastError = message;
}
}
console.log(`[监控] ${terminalId} 状态: ${status}`);
});
}
getStatusSummary(): {
total: number;
connected: number;
disconnected: number;
details: Map<string, TerminalStatus>;
} {
// 获取所有终端状态摘要
const connectedCount = Array.from(this.terminalStatus.values())
.filter(s => s.connected).length;
const total = this.terminalStatus.size;
return {
total,
connected: connectedCount,
disconnected: total - connectedCount,
details: this.terminalStatus
};
}
async checkAndReconnect(): Promise<void> {
// 检查并重连断开的终端
for (const [terminalId, status] of this.terminalStatus.entries()) {
if (!status.connected) {
try {
console.log(`尝试重连 ${terminalId}...`);
await this.sdk.connect(terminalId);
} catch (error) {
console.error(`重连 ${terminalId} 失败: ${error}`);
}
}
}
}
getSDK(): SDK {
return this.sdk;
}
}
// 使用监控
async function main() {
const monitor = new TerminalMonitor();
// 注册终端
const config1 = new Config({
// ... 配置参数
});
monitor.registerWithMonitoring("sensor-1", config1);
const config2 = new Config({
// ... 配置参数
});
monitor.registerWithMonitoring("sensor-2", config2);
// 连接
await monitor.getSDK().connectAll();
// 定期检查状态
const intervalId = setInterval(async () => {
const summary = monitor.getStatusSummary();
console.log(`\n状态摘要: ${summary.connected}/${summary.total} 已连接`);
// 尝试重连
await monitor.checkAndReconnect();
}, 30000); // 每30秒检查一次
// 运行一段时间后清理
await new Promise(resolve => setTimeout(resolve, 300000));
clearInterval(intervalId);
}
main().catch(console.error);最佳实践
- 资源管理 - 使用单个 SDK 实例管理所有终端
- 错误隔离 - 单个终端的错误不应影响其他终端
- 并发控制 - 使用 Promise.all 并发操作多个终端
- 状态监控 - 实时监控所有终端的连接状态
- 优雅关闭 - 确保所有终端正确断开连接
下一步
Last updated on