错误处理
完善的错误处理策略,确保应用稳定可靠。
基础错误处理
import com.streamind.sdk.*;
public class BasicErrorHandling {
public static void main(String[] args) {
SDK sdk = new SDK();
Config config = new Config.Builder()...build();
sdk.registerTerminal("terminal-1", config);
// 连接错误处理
try {
sdk.connect("terminal-1");
System.out.println("连接成功");
} catch (ConnectionException e) {
System.err.println("连接失败: " + e.getMessage());
return;
} catch (Exception e) {
System.err.println("未知错误: " + e.getMessage());
return;
}
// 发送信号错误处理
Signal signal = new Signal("sensor.data");
signal.getPayload().setNumber("value", 25.5);
try {
sdk.sendSignal("terminal-1", signal);
System.out.println("信号发送成功");
} catch (SendException e) {
System.err.println("发送失败: " + e.getMessage());
}
// 断开连接
try {
sdk.disconnect("terminal-1");
} catch (Exception e) {
System.err.println("断开连接时出错: " + e.getMessage());
}
}
}重试机制
连接重试
public class ConnectionRetry {
public static boolean connectWithRetry(
SDK sdk,
String terminalId,
int maxRetries,
int retryDelayMs
) {
// 带重试的连接
for (int attempt = 0; attempt < maxRetries; attempt++) {
try {
System.out.println("连接尝试 " + (attempt + 1) + "/" + maxRetries + "...");
sdk.connect(terminalId);
System.out.println("连接成功");
return true;
} catch (Exception e) {
System.err.println("连接失败: " + e.getMessage());
if (attempt < maxRetries - 1) {
System.out.println("等待 " + (retryDelayMs / 1000) + " 秒后重试...");
try {
Thread.sleep(retryDelayMs);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return false;
}
} else {
System.out.println("达到最大重试次数,连接失败");
return false;
}
}
}
return false;
}
public static void main(String[] args) {
SDK sdk = new SDK();
Config config = new Config.Builder()...build();
sdk.registerTerminal("terminal-1", config);
// 带重试的连接
boolean success = connectWithRetry(sdk, "terminal-1", 5, 5000);
if (success) {
// 继续后续操作
try {
Thread.sleep(60000);
sdk.disconnect("terminal-1");
} catch (Exception e) {
e.printStackTrace();
}
} else {
System.out.println("无法建立连接");
}
}
}发送重试
public class SendRetry {
public static boolean sendWithRetry(
SDK sdk,
String terminalId,
Signal signal,
int maxRetries
) {
// 带重试的信号发送
for (int attempt = 0; attempt < maxRetries; attempt++) {
try {
sdk.sendSignal(terminalId, signal);
System.out.println("信号发送成功(尝试 " + (attempt + 1) + ")");
return true;
} catch (Exception e) {
System.err.println("发送失败(尝试 " + (attempt + 1) + "): " + e.getMessage());
if (attempt < maxRetries - 1) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return false;
}
} else {
System.out.println("达到最大重试次数,发送失败");
return false;
}
}
}
return false;
}
// 使用
public static void main(String[] args) {
SDK sdk = new SDK();
// ... 配置和连接 ...
Signal signal = new Signal("sensor.data");
signal.getPayload().setNumber("value", 25.5);
boolean success = sendWithRetry(sdk, "terminal-1", signal, 5);
}
}指数退避重试
import java.util.Random;
public class ExponentialBackoff {
private static final Random random = new Random();
public static boolean sendWithExponentialBackoff(
SDK sdk,
String terminalId,
Signal signal,
int maxRetries
) {
// 指数退避重试
int baseDelay = 1000; // 基础延迟(毫秒)
int maxDelay = 60000; // 最大延迟(毫秒)
for (int attempt = 0; attempt < maxRetries; attempt++) {
try {
sdk.sendSignal(terminalId, signal);
System.out.println("发送成功(尝试 " + (attempt + 1) + ")");
return true;
} catch (Exception e) {
System.err.println("发送失败(尝试 " + (attempt + 1) + "): " + e.getMessage());
if (attempt < maxRetries - 1) {
// 计算延迟时间:2^attempt * baseDelay + 随机抖动
int delay = Math.min((int) (baseDelay * Math.pow(2, attempt)), maxDelay);
int jitter = (int) (random.nextDouble() * 0.1 * delay); // 10% 抖动
int totalDelay = delay + jitter;
System.out.println("等待 " + (totalDelay / 1000.0) + " 秒后重试...");
try {
Thread.sleep(totalDelay);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return false;
}
} else {
System.out.println("达到最大重试次数");
return false;
}
}
}
return false;
}
}超时处理
import java.util.concurrent.*;
public class TimeoutHandling {
public static boolean connectWithTimeout(
SDK sdk,
String terminalId,
int timeoutSeconds
) {
// 带超时的连接
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
Future<?> future = executor.submit(() -> {
try {
sdk.connect(terminalId);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
future.get(timeoutSeconds, TimeUnit.SECONDS);
System.out.println("连接成功");
return true;
} catch (TimeoutException e) {
System.err.println("连接超时(" + timeoutSeconds + "秒)");
return false;
} catch (Exception e) {
System.err.println("连接失败: " + e.getMessage());
return false;
} finally {
executor.shutdown();
}
}
public static boolean sendWithTimeout(
SDK sdk,
String terminalId,
Signal signal,
int timeoutSeconds
) {
// 带超时的发送
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
Future<?> future = executor.submit(() -> {
try {
sdk.sendSignal(terminalId, signal);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
future.get(timeoutSeconds, TimeUnit.SECONDS);
System.out.println("发送成功");
return true;
} catch (TimeoutException e) {
System.err.println("发送超时(" + timeoutSeconds + "秒)");
return false;
} catch (Exception e) {
System.err.println("发送失败: " + e.getMessage());
return false;
} finally {
executor.shutdown();
}
}
}配置验证
public class ConfigValidator {
public static boolean validate(Config config) {
// 验证配置
if (config.getDeviceId() == null || config.getDeviceId().isEmpty()) {
System.err.println("错误:deviceId 不能为空");
return false;
}
if (config.getDeviceType() == null || config.getDeviceType().isEmpty()) {
System.err.println("错误:deviceType 不能为空");
return false;
}
if (config.getEndpoint() == null || !config.getEndpoint().startsWith("wss://")) {
System.err.println("错误:endpoint 必须使用 wss:// 协议");
return false;
}
if (config.getTenantId() == null || config.getTenantId().isEmpty()) {
System.err.println("错误:tenantId 不能为空");
return false;
}
if (config.getProductId() == null || config.getProductId().isEmpty()) {
System.err.println("错误:productId 不能为空");
return false;
}
if (config.getProductKey() == null || config.getProductKey().isEmpty()) {
System.err.println("错误:productKey 不能为空");
return false;
}
return true;
}
// 使用
public static void main(String[] args) {
Config config = new Config.Builder()...build();
if (ConfigValidator.validate(config)) {
SDK sdk = new SDK();
sdk.registerTerminal("terminal-1", config);
} else {
System.err.println("配置无效");
}
}
}数据验证
public class SensorDataValidator {
// 传感器数据验证器
public static void validateTemperature(double temperature) throws IllegalArgumentException {
// 验证温度值
if (temperature < -50 || temperature > 150) {
throw new IllegalArgumentException("温度值超出范围: " + temperature + "°C");
}
}
public static Signal createValidatedSignal(double temperature) throws IllegalArgumentException {
// 创建经过验证的温度信号
validateTemperature(temperature);
Signal signal = new Signal("sensor.temperature");
signal.getPayload().setNumber("value", temperature);
signal.getPayload().setString("unit", "celsius");
return signal;
}
// 使用
public static void main(String[] args) {
SDK sdk = new SDK();
// ... 配置和连接 ...
try {
Signal signal = SensorDataValidator.createValidatedSignal(25.5);
sdk.sendSignal("terminal-1", signal);
} catch (IllegalArgumentException e) {
System.err.println("验证错误: " + e.getMessage());
} catch (Exception e) {
e.printStackTrace();
}
}
}日志记录
使用 SLF4J 和 Logback 进行日志记录:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LoggingExample {
private static final Logger logger = LoggerFactory.getLogger(LoggingExample.class);
public static void main(String[] args) {
try {
SDK sdk = new SDK();
Config config = new Config.Builder()...build();
sdk.registerTerminal("terminal-1", config);
logger.info("开始连接到 StreamInd 平台");
sdk.connect("terminal-1");
logger.info("连接成功");
Signal signal = new Signal("sensor.data");
signal.getPayload().setNumber("value", 25.5);
logger.info("发送传感器数据");
sdk.sendSignal("terminal-1", signal);
logger.info("数据发送成功");
Thread.sleep(60000);
logger.info("断开连接");
sdk.disconnect("terminal-1");
logger.info("已断开连接");
} catch (Exception e) {
logger.error("应用程序错误", e);
}
}
}logback.xml 配置:
<configuration>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>streamind-sdk.log</file>
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="FILE" />
<appender-ref ref="CONSOLE" />
</root>
</configuration>优雅退出
public class GracefulShutdown {
private SDK sdk;
private volatile boolean running = true;
public void start() {
// 启动应用
Config config = new Config.Builder()...build();
sdk = new SDK();
sdk.registerTerminal("terminal-1", config);
try {
// 连接
sdk.connect("terminal-1");
System.out.println("应用已启动");
// 主循环
while (running) {
try {
// 应用逻辑...
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
System.err.println("运行时错误: " + e.getMessage());
}
}
// 清理
cleanup();
} catch (Exception e) {
e.printStackTrace();
}
}
private void cleanup() {
// 清理资源
System.out.println("正在清理资源...");
try {
sdk.disconnectAll();
System.out.println("资源清理完成");
} catch (Exception e) {
System.err.println("清理时出错: " + e.getMessage());
}
}
public void stop() {
// 停止应用
System.out.println("收到停止信号");
running = false;
}
public static void main(String[] args) {
GracefulShutdown app = new GracefulShutdown();
// 添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
app.stop();
try {
Thread.sleep(1000); // 给应用时间来清理
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}));
app.start();
}
}最佳实践
- 异常捕获 - 捕获所有可能的异常,避免程序崩溃
- 重试机制 - 网络操作使用指数退避重试
- 超时设置 - 为所有阻塞操作设置合理超时
- 日志记录 - 记录所有关键操作和错误
- 优雅退出 - 确保资源正确释放
- 数据验证 - 发送前验证数据有效性
- 线程安全 - 多线程环境下注意同步
相关文档
Last updated on