Skip to Content

多终端管理

使用单个 SDK 实例管理多个设备连接,适用于网关、集中控制器等场景。

基础多终端连接

import com.streamind.sdk.*; import java.util.Map; public class MultiTerminalExample { public static void main(String[] args) { // 创建 SDK 实例 SDK sdk = new SDK(); // 为不同设备创建配置 Config config1 = new Config.Builder() .deviceId("sensor-001") .deviceType("temperature") .endpoint("wss://your-platform.com/signals") .tenantId("your-tenant-id") .productId("product-sensors") .productKey("your-secret-key") .build(); Config config2 = new Config.Builder() .deviceId("sensor-002") .deviceType("humidity") .endpoint("wss://your-platform.com/signals") .tenantId("your-tenant-id") .productId("product-sensors") .productKey("your-secret-key") .build(); Config config3 = new Config.Builder() .deviceId("actuator-001") .deviceType("motor") .endpoint("wss://your-platform.com/signals") .tenantId("your-tenant-id") .productId("product-actuators") .productKey("your-actuators-key") .build(); // 注册所有终端 sdk.registerTerminal("sensor-1", config1); sdk.registerTerminal("sensor-2", config2); sdk.registerTerminal("actuator-1", config3); System.out.println("注册了 3 个终端"); try { // 批量连接所有终端 Map<String, Boolean> results = sdk.connectAll(); // 检查连接结果 for (Map.Entry<String, Boolean> entry : results.entrySet()) { String status = entry.getValue() ? "成功" : "失败"; System.out.println(entry.getKey() + ": 连接" + status); } // 向不同终端发送信号 // 发送温度数据 Signal signal1 = new Signal("sensor.temperature"); signal1.getPayload().setNumber("value", 25.5); sdk.sendSignal("sensor-1", signal1); System.out.println("温度数据已发送"); // 发送湿度数据 Signal signal2 = new Signal("sensor.humidity"); signal2.getPayload().setNumber("value", 60.0); sdk.sendSignal("sensor-2", signal2); System.out.println("湿度数据已发送"); // 控制执行器 Signal signal3 = new Signal("actuator.control"); signal3.getPayload().setString("action", "start"); sdk.sendSignal("actuator-1", signal3); System.out.println("执行器控制信号已发送"); // 保持连接 Thread.sleep(60000); // 批量断开所有终端 sdk.disconnectAll(); System.out.println("所有终端已断开"); } catch (Exception e) { e.printStackTrace(); } } }

为每个终端设置独立回调

public class IndependentCallbacksExample { public static void main(String[] args) { SDK sdk = new SDK(); // 注册终端 sdk.registerTerminal("sensor-1", config1); sdk.registerTerminal("sensor-2", config2); sdk.registerTerminal("actuator-1", config3); // 为每个终端设置独立的指令回调 sdk.setDirectiveCallback("sensor-1", directive -> { System.out.println("[传感器1] 收到指令: " + directive.getName()); // 处理传感器1的指令 }); sdk.setDirectiveCallback("sensor-2", directive -> { System.out.println("[传感器2] 收到指令: " + directive.getName()); // 处理传感器2的指令 }); sdk.setDirectiveCallback("actuator-1", directive -> { System.out.println("[执行器] 收到指令: " + directive.getName()); Payload payload = directive.getPayload(); String action = payload.getString("action"); // 执行控制动作 }); // 为每个终端设置独立的连接状态回调 sdk.setConnectionCallback("sensor-1", (status, message) -> { System.out.println("[传感器1] 连接状态: " + status); }); sdk.setConnectionCallback("sensor-2", (status, message) -> { System.out.println("[传感器2] 连接状态: " + status); }); sdk.setConnectionCallback("actuator-1", (status, message) -> { System.out.println("[执行器] 连接状态: " + status); }); try { // 连接所有终端 sdk.connectAll(); // 保持运行 Thread.sleep(3600000); sdk.disconnectAll(); } catch (Exception e) { e.printStackTrace(); } } }

动态终端管理

运行时动态添加和移除终端:

import java.util.HashMap; import java.util.Map; import java.util.Set; public class TerminalManager { private SDK sdk; private Map<String, Config> terminals; public TerminalManager() { this.sdk = new SDK(); this.terminals = new HashMap<>(); } public boolean addTerminal(String terminalId, Config config) { // 动态添加终端 try { // 注册终端 sdk.registerTerminal(terminalId, config); // 设置回调 sdk.setDirectiveCallback(terminalId, directive -> { System.out.println("[" + terminalId + "] 收到指令: " + directive.getName()); }); // 连接 sdk.connect(terminalId); // 记录终端 terminals.put(terminalId, config); System.out.println("终端 " + terminalId + " 已添加并连接"); return true; } catch (Exception e) { System.err.println("添加终端 " + terminalId + " 失败: " + e.getMessage()); return false; } } public boolean removeTerminal(String terminalId) { // 动态移除终端 try { if (terminals.containsKey(terminalId)) { // 断开连接 sdk.disconnect(terminalId); // 移除记录 terminals.remove(terminalId); System.out.println("终端 " + terminalId + " 已移除"); return true; } else { System.out.println("终端 " + terminalId + " 不存在"); return false; } } catch (Exception e) { System.err.println("移除终端 " + terminalId + " 失败: " + e.getMessage()); return false; } } public boolean sendToTerminal(String terminalId, Signal signal) { // 向指定终端发送信号 try { sdk.sendSignal(terminalId, signal); return true; } catch (Exception e) { System.err.println("发送失败: " + e.getMessage()); return false; } } public Set<String> listTerminals() { // 列出所有终端 return terminals.keySet(); } public void closeAll() { // 关闭所有终端 sdk.disconnectAll(); terminals.clear(); } public static void main(String[] args) { TerminalManager manager = new TerminalManager(); // 动态添加终端 Config config1 = new Config.Builder()...build(); manager.addTerminal("sensor-1", config1); Config config2 = new Config.Builder()...build(); manager.addTerminal("sensor-2", config2); // 列出终端 System.out.println("当前终端: " + manager.listTerminals()); // 发送数据 Signal signal = new Signal("sensor.data"); signal.getPayload().setNumber("value", 25.5); manager.sendToTerminal("sensor-1", signal); // 移除终端 manager.removeTerminal("sensor-2"); // 清理 manager.closeAll(); } }

网关场景

作为网关管理多个下游设备:

import java.util.HashMap; import java.util.Map; public class Gateway { private SDK sdk; private Map<String, DeviceInfo> devices; static class DeviceInfo { Config config; boolean connected; DeviceInfo(Config config) { this.config = config; this.connected = false; } } public Gateway() { this.sdk = new SDK(); this.devices = new HashMap<>(); } public void registerDevice(String deviceId, Config deviceConfig) { // 注册设备 sdk.registerTerminal(deviceId, deviceConfig); devices.put(deviceId, new DeviceInfo(deviceConfig)); // 设置回调 sdk.setDirectiveCallback(deviceId, directive -> { handleDeviceDirective(deviceId, directive); }); sdk.setConnectionCallback(deviceId, (status, message) -> { DeviceInfo device = devices.get(deviceId); if (device != null) { device.connected = "connected".equals(status); } System.out.println("设备 " + deviceId + " 连接状态: " + status); }); } private void handleDeviceDirective(String deviceId, Directive directive) { // 处理设备指令 System.out.println("设备 " + deviceId + " 收到指令: " + directive.getName()); // 根据指令类型分发处理 if ("device.query".equals(directive.getName())) { respondToQuery(deviceId, directive); } else if ("device.control".equals(directive.getName())) { controlDevice(deviceId, directive); } } private void respondToQuery(String deviceId, Directive directive) { // 响应查询指令 try { Signal signal = new Signal("device.status"); DeviceInfo device = devices.get(deviceId); signal.getPayload().setString("status", "online"); signal.getPayload().setBoolean("connected", device.connected); sdk.sendSignal(deviceId, signal); } catch (Exception e) { e.printStackTrace(); } } private void controlDevice(String deviceId, Directive directive) { // 控制设备 Payload payload = directive.getPayload(); String action = payload.getString("action"); System.out.println("执行设备 " + deviceId + " 的动作: " + action); // 执行控制逻辑... } public void start() { // 启动网关 Map<String, Boolean> results = sdk.connectAll(); for (Map.Entry<String, Boolean> entry : results.entrySet()) { String status = entry.getValue() ? "成功" : "失败"; System.out.println("设备 " + entry.getKey() + " 连接" + status); } } public void broadcastSignal(Signal signal) { // 广播信号到所有设备 for (String deviceId : devices.keySet()) { try { sdk.sendSignal(deviceId, signal); } catch (Exception e) { System.err.println("广播到 " + deviceId + " 失败: " + e.getMessage()); } } } public void stop() { // 停止网关 sdk.disconnectAll(); } public static void main(String[] args) { Gateway gateway = new Gateway(); // 注册设备 Config device1Config = new Config.Builder()...build(); gateway.registerDevice("device-1", device1Config); Config device2Config = new Config.Builder()...build(); gateway.registerDevice("device-2", device2Config); // 启动网关 gateway.start(); // 广播信号 Signal broadcastSignal = new Signal("gateway.announcement"); broadcastSignal.getPayload().setString("message", "Gateway online"); gateway.broadcastSignal(broadcastSignal); try { // 保持运行 Thread.sleep(3600000); } catch (InterruptedException e) { e.printStackTrace(); } // 停止网关 gateway.stop(); } }

并发发送数据

使用 ExecutorService 并发发送数据到多个终端:

import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; public class ConcurrentSendExample { public static void sendToAllSensors( SDK sdk, List<String> sensorIds, double value, String timestamp ) throws InterruptedException { // 并发发送数据到所有传感器 ExecutorService executor = Executors.newFixedThreadPool(sensorIds.size()); List<Future<?>> futures = new ArrayList<>(); for (String sensorId : sensorIds) { Future<?> future = executor.submit(() -> { try { Signal signal = new Signal("sensor.data"); Payload payload = signal.getPayload(); payload.setNumber("value", value); payload.setString("timestamp", timestamp); sdk.sendSignal(sensorId, signal); System.out.println(sensorId + ": 发送成功"); } catch (Exception e) { System.err.println(sensorId + ": 发送失败 - " + e.getMessage()); } }); futures.add(future); } // 等待所有任务完成 for (Future<?> future : futures) { try { future.get(); } catch (ExecutionException e) { e.printStackTrace(); } } executor.shutdown(); } public static void main(String[] args) { SDK sdk = new SDK(); // 注册多个传感器 List<String> sensorIds = new ArrayList<>(); sensorIds.add("sensor-1"); sensorIds.add("sensor-2"); sensorIds.add("sensor-3"); sensorIds.add("sensor-4"); for (String sensorId : sensorIds) { Config config = new Config.Builder() .deviceId(sensorId) // ... 其他配置参数 .build(); sdk.registerTerminal(sensorId, config); } try { // 连接所有传感器 sdk.connectAll(); // 并发发送数据 sendToAllSensors(sdk, sensorIds, 25.5, "2024-01-01T00:00:00Z"); sdk.disconnectAll(); } catch (Exception e) { e.printStackTrace(); } } }

最佳实践

  1. 资源管理 - 使用单个 SDK 实例管理所有终端
  2. 错误隔离 - 单个终端的错误不应影响其他终端
  3. 并发控制 - 使用 ExecutorService 并发操作多个终端
  4. 状态监控 - 实时监控所有终端的连接状态
  5. 优雅关闭 - 确保所有终端正确断开连接

下一步

Last updated on