多终端管理
使用单个 SDK 实例管理多个设备连接,适用于网关、集中控制器等场景。
基础多终端连接
import com.streamind.sdk.*;
import java.util.Map;
public class MultiTerminalExample {
public static void main(String[] args) {
// 创建 SDK 实例
SDK sdk = new SDK();
// 为不同设备创建配置
Config config1 = new Config.Builder()
.deviceId("sensor-001")
.deviceType("temperature")
.endpoint("wss://your-platform.com/signals")
.tenantId("your-tenant-id")
.productId("product-sensors")
.productKey("your-secret-key")
.build();
Config config2 = new Config.Builder()
.deviceId("sensor-002")
.deviceType("humidity")
.endpoint("wss://your-platform.com/signals")
.tenantId("your-tenant-id")
.productId("product-sensors")
.productKey("your-secret-key")
.build();
Config config3 = new Config.Builder()
.deviceId("actuator-001")
.deviceType("motor")
.endpoint("wss://your-platform.com/signals")
.tenantId("your-tenant-id")
.productId("product-actuators")
.productKey("your-actuators-key")
.build();
// 注册所有终端
sdk.registerTerminal("sensor-1", config1);
sdk.registerTerminal("sensor-2", config2);
sdk.registerTerminal("actuator-1", config3);
System.out.println("注册了 3 个终端");
try {
// 批量连接所有终端
Map<String, Boolean> results = sdk.connectAll();
// 检查连接结果
for (Map.Entry<String, Boolean> entry : results.entrySet()) {
String status = entry.getValue() ? "成功" : "失败";
System.out.println(entry.getKey() + ": 连接" + status);
}
// 向不同终端发送信号
// 发送温度数据
Signal signal1 = new Signal("sensor.temperature");
signal1.getPayload().setNumber("value", 25.5);
sdk.sendSignal("sensor-1", signal1);
System.out.println("温度数据已发送");
// 发送湿度数据
Signal signal2 = new Signal("sensor.humidity");
signal2.getPayload().setNumber("value", 60.0);
sdk.sendSignal("sensor-2", signal2);
System.out.println("湿度数据已发送");
// 控制执行器
Signal signal3 = new Signal("actuator.control");
signal3.getPayload().setString("action", "start");
sdk.sendSignal("actuator-1", signal3);
System.out.println("执行器控制信号已发送");
// 保持连接
Thread.sleep(60000);
// 批量断开所有终端
sdk.disconnectAll();
System.out.println("所有终端已断开");
} catch (Exception e) {
e.printStackTrace();
}
}
}为每个终端设置独立回调
public class IndependentCallbacksExample {
public static void main(String[] args) {
SDK sdk = new SDK();
// 注册终端
sdk.registerTerminal("sensor-1", config1);
sdk.registerTerminal("sensor-2", config2);
sdk.registerTerminal("actuator-1", config3);
// 为每个终端设置独立的指令回调
sdk.setDirectiveCallback("sensor-1", directive -> {
System.out.println("[传感器1] 收到指令: " + directive.getName());
// 处理传感器1的指令
});
sdk.setDirectiveCallback("sensor-2", directive -> {
System.out.println("[传感器2] 收到指令: " + directive.getName());
// 处理传感器2的指令
});
sdk.setDirectiveCallback("actuator-1", directive -> {
System.out.println("[执行器] 收到指令: " + directive.getName());
Payload payload = directive.getPayload();
String action = payload.getString("action");
// 执行控制动作
});
// 为每个终端设置独立的连接状态回调
sdk.setConnectionCallback("sensor-1", (status, message) -> {
System.out.println("[传感器1] 连接状态: " + status);
});
sdk.setConnectionCallback("sensor-2", (status, message) -> {
System.out.println("[传感器2] 连接状态: " + status);
});
sdk.setConnectionCallback("actuator-1", (status, message) -> {
System.out.println("[执行器] 连接状态: " + status);
});
try {
// 连接所有终端
sdk.connectAll();
// 保持运行
Thread.sleep(3600000);
sdk.disconnectAll();
} catch (Exception e) {
e.printStackTrace();
}
}
}动态终端管理
运行时动态添加和移除终端:
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class TerminalManager {
private SDK sdk;
private Map<String, Config> terminals;
public TerminalManager() {
this.sdk = new SDK();
this.terminals = new HashMap<>();
}
public boolean addTerminal(String terminalId, Config config) {
// 动态添加终端
try {
// 注册终端
sdk.registerTerminal(terminalId, config);
// 设置回调
sdk.setDirectiveCallback(terminalId, directive -> {
System.out.println("[" + terminalId + "] 收到指令: " + directive.getName());
});
// 连接
sdk.connect(terminalId);
// 记录终端
terminals.put(terminalId, config);
System.out.println("终端 " + terminalId + " 已添加并连接");
return true;
} catch (Exception e) {
System.err.println("添加终端 " + terminalId + " 失败: " + e.getMessage());
return false;
}
}
public boolean removeTerminal(String terminalId) {
// 动态移除终端
try {
if (terminals.containsKey(terminalId)) {
// 断开连接
sdk.disconnect(terminalId);
// 移除记录
terminals.remove(terminalId);
System.out.println("终端 " + terminalId + " 已移除");
return true;
} else {
System.out.println("终端 " + terminalId + " 不存在");
return false;
}
} catch (Exception e) {
System.err.println("移除终端 " + terminalId + " 失败: " + e.getMessage());
return false;
}
}
public boolean sendToTerminal(String terminalId, Signal signal) {
// 向指定终端发送信号
try {
sdk.sendSignal(terminalId, signal);
return true;
} catch (Exception e) {
System.err.println("发送失败: " + e.getMessage());
return false;
}
}
public Set<String> listTerminals() {
// 列出所有终端
return terminals.keySet();
}
public void closeAll() {
// 关闭所有终端
sdk.disconnectAll();
terminals.clear();
}
public static void main(String[] args) {
TerminalManager manager = new TerminalManager();
// 动态添加终端
Config config1 = new Config.Builder()...build();
manager.addTerminal("sensor-1", config1);
Config config2 = new Config.Builder()...build();
manager.addTerminal("sensor-2", config2);
// 列出终端
System.out.println("当前终端: " + manager.listTerminals());
// 发送数据
Signal signal = new Signal("sensor.data");
signal.getPayload().setNumber("value", 25.5);
manager.sendToTerminal("sensor-1", signal);
// 移除终端
manager.removeTerminal("sensor-2");
// 清理
manager.closeAll();
}
}网关场景
作为网关管理多个下游设备:
import java.util.HashMap;
import java.util.Map;
public class Gateway {
private SDK sdk;
private Map<String, DeviceInfo> devices;
static class DeviceInfo {
Config config;
boolean connected;
DeviceInfo(Config config) {
this.config = config;
this.connected = false;
}
}
public Gateway() {
this.sdk = new SDK();
this.devices = new HashMap<>();
}
public void registerDevice(String deviceId, Config deviceConfig) {
// 注册设备
sdk.registerTerminal(deviceId, deviceConfig);
devices.put(deviceId, new DeviceInfo(deviceConfig));
// 设置回调
sdk.setDirectiveCallback(deviceId, directive -> {
handleDeviceDirective(deviceId, directive);
});
sdk.setConnectionCallback(deviceId, (status, message) -> {
DeviceInfo device = devices.get(deviceId);
if (device != null) {
device.connected = "connected".equals(status);
}
System.out.println("设备 " + deviceId + " 连接状态: " + status);
});
}
private void handleDeviceDirective(String deviceId, Directive directive) {
// 处理设备指令
System.out.println("设备 " + deviceId + " 收到指令: " + directive.getName());
// 根据指令类型分发处理
if ("device.query".equals(directive.getName())) {
respondToQuery(deviceId, directive);
} else if ("device.control".equals(directive.getName())) {
controlDevice(deviceId, directive);
}
}
private void respondToQuery(String deviceId, Directive directive) {
// 响应查询指令
try {
Signal signal = new Signal("device.status");
DeviceInfo device = devices.get(deviceId);
signal.getPayload().setString("status", "online");
signal.getPayload().setBoolean("connected", device.connected);
sdk.sendSignal(deviceId, signal);
} catch (Exception e) {
e.printStackTrace();
}
}
private void controlDevice(String deviceId, Directive directive) {
// 控制设备
Payload payload = directive.getPayload();
String action = payload.getString("action");
System.out.println("执行设备 " + deviceId + " 的动作: " + action);
// 执行控制逻辑...
}
public void start() {
// 启动网关
Map<String, Boolean> results = sdk.connectAll();
for (Map.Entry<String, Boolean> entry : results.entrySet()) {
String status = entry.getValue() ? "成功" : "失败";
System.out.println("设备 " + entry.getKey() + " 连接" + status);
}
}
public void broadcastSignal(Signal signal) {
// 广播信号到所有设备
for (String deviceId : devices.keySet()) {
try {
sdk.sendSignal(deviceId, signal);
} catch (Exception e) {
System.err.println("广播到 " + deviceId + " 失败: " + e.getMessage());
}
}
}
public void stop() {
// 停止网关
sdk.disconnectAll();
}
public static void main(String[] args) {
Gateway gateway = new Gateway();
// 注册设备
Config device1Config = new Config.Builder()...build();
gateway.registerDevice("device-1", device1Config);
Config device2Config = new Config.Builder()...build();
gateway.registerDevice("device-2", device2Config);
// 启动网关
gateway.start();
// 广播信号
Signal broadcastSignal = new Signal("gateway.announcement");
broadcastSignal.getPayload().setString("message", "Gateway online");
gateway.broadcastSignal(broadcastSignal);
try {
// 保持运行
Thread.sleep(3600000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 停止网关
gateway.stop();
}
}并发发送数据
使用 ExecutorService 并发发送数据到多个终端:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class ConcurrentSendExample {
public static void sendToAllSensors(
SDK sdk,
List<String> sensorIds,
double value,
String timestamp
) throws InterruptedException {
// 并发发送数据到所有传感器
ExecutorService executor = Executors.newFixedThreadPool(sensorIds.size());
List<Future<?>> futures = new ArrayList<>();
for (String sensorId : sensorIds) {
Future<?> future = executor.submit(() -> {
try {
Signal signal = new Signal("sensor.data");
Payload payload = signal.getPayload();
payload.setNumber("value", value);
payload.setString("timestamp", timestamp);
sdk.sendSignal(sensorId, signal);
System.out.println(sensorId + ": 发送成功");
} catch (Exception e) {
System.err.println(sensorId + ": 发送失败 - " + e.getMessage());
}
});
futures.add(future);
}
// 等待所有任务完成
for (Future<?> future : futures) {
try {
future.get();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
executor.shutdown();
}
public static void main(String[] args) {
SDK sdk = new SDK();
// 注册多个传感器
List<String> sensorIds = new ArrayList<>();
sensorIds.add("sensor-1");
sensorIds.add("sensor-2");
sensorIds.add("sensor-3");
sensorIds.add("sensor-4");
for (String sensorId : sensorIds) {
Config config = new Config.Builder()
.deviceId(sensorId)
// ... 其他配置参数
.build();
sdk.registerTerminal(sensorId, config);
}
try {
// 连接所有传感器
sdk.connectAll();
// 并发发送数据
sendToAllSensors(sdk, sensorIds, 25.5, "2024-01-01T00:00:00Z");
sdk.disconnectAll();
} catch (Exception e) {
e.printStackTrace();
}
}
}最佳实践
- 资源管理 - 使用单个 SDK 实例管理所有终端
- 错误隔离 - 单个终端的错误不应影响其他终端
- 并发控制 - 使用 ExecutorService 并发操作多个终端
- 状态监控 - 实时监控所有终端的连接状态
- 优雅关闭 - 确保所有终端正确断开连接
下一步
Last updated on