Skip to Content

错误处理

完善的错误处理策略,确保应用稳定可靠。

基础错误处理

import com.streamind.sdk.*; public class BasicErrorHandling { public static void main(String[] args) { SDK sdk = new SDK(); Config config = new Config.Builder()...build(); sdk.registerTerminal("terminal-1", config); // 连接错误处理 try { sdk.connect("terminal-1"); System.out.println("连接成功"); } catch (ConnectionException e) { System.err.println("连接失败: " + e.getMessage()); return; } catch (Exception e) { System.err.println("未知错误: " + e.getMessage()); return; } // 发送信号错误处理 Signal signal = new Signal("sensor.data"); signal.getPayload().setNumber("value", 25.5); try { sdk.sendSignal("terminal-1", signal); System.out.println("信号发送成功"); } catch (SendException e) { System.err.println("发送失败: " + e.getMessage()); } // 断开连接 try { sdk.disconnect("terminal-1"); } catch (Exception e) { System.err.println("断开连接时出错: " + e.getMessage()); } } }

重试机制

连接重试

public class ConnectionRetry { public static boolean connectWithRetry( SDK sdk, String terminalId, int maxRetries, int retryDelayMs ) { // 带重试的连接 for (int attempt = 0; attempt < maxRetries; attempt++) { try { System.out.println("连接尝试 " + (attempt + 1) + "/" + maxRetries + "..."); sdk.connect(terminalId); System.out.println("连接成功"); return true; } catch (Exception e) { System.err.println("连接失败: " + e.getMessage()); if (attempt < maxRetries - 1) { System.out.println("等待 " + (retryDelayMs / 1000) + " 秒后重试..."); try { Thread.sleep(retryDelayMs); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); return false; } } else { System.out.println("达到最大重试次数,连接失败"); return false; } } } return false; } public static void main(String[] args) { SDK sdk = new SDK(); Config config = new Config.Builder()...build(); sdk.registerTerminal("terminal-1", config); // 带重试的连接 boolean success = connectWithRetry(sdk, "terminal-1", 5, 5000); if (success) { // 继续后续操作 try { Thread.sleep(60000); sdk.disconnect("terminal-1"); } catch (Exception e) { e.printStackTrace(); } } else { System.out.println("无法建立连接"); } } }

发送重试

public class SendRetry { public static boolean sendWithRetry( SDK sdk, String terminalId, Signal signal, int maxRetries ) { // 带重试的信号发送 for (int attempt = 0; attempt < maxRetries; attempt++) { try { sdk.sendSignal(terminalId, signal); System.out.println("信号发送成功(尝试 " + (attempt + 1) + ")"); return true; } catch (Exception e) { System.err.println("发送失败(尝试 " + (attempt + 1) + "): " + e.getMessage()); if (attempt < maxRetries - 1) { try { Thread.sleep(1000); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); return false; } } else { System.out.println("达到最大重试次数,发送失败"); return false; } } } return false; } // 使用 public static void main(String[] args) { SDK sdk = new SDK(); // ... 配置和连接 ... Signal signal = new Signal("sensor.data"); signal.getPayload().setNumber("value", 25.5); boolean success = sendWithRetry(sdk, "terminal-1", signal, 5); } }

指数退避重试

import java.util.Random; public class ExponentialBackoff { private static final Random random = new Random(); public static boolean sendWithExponentialBackoff( SDK sdk, String terminalId, Signal signal, int maxRetries ) { // 指数退避重试 int baseDelay = 1000; // 基础延迟(毫秒) int maxDelay = 60000; // 最大延迟(毫秒) for (int attempt = 0; attempt < maxRetries; attempt++) { try { sdk.sendSignal(terminalId, signal); System.out.println("发送成功(尝试 " + (attempt + 1) + ")"); return true; } catch (Exception e) { System.err.println("发送失败(尝试 " + (attempt + 1) + "): " + e.getMessage()); if (attempt < maxRetries - 1) { // 计算延迟时间:2^attempt * baseDelay + 随机抖动 int delay = Math.min((int) (baseDelay * Math.pow(2, attempt)), maxDelay); int jitter = (int) (random.nextDouble() * 0.1 * delay); // 10% 抖动 int totalDelay = delay + jitter; System.out.println("等待 " + (totalDelay / 1000.0) + " 秒后重试..."); try { Thread.sleep(totalDelay); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); return false; } } else { System.out.println("达到最大重试次数"); return false; } } } return false; } }

超时处理

import java.util.concurrent.*; public class TimeoutHandling { public static boolean connectWithTimeout( SDK sdk, String terminalId, int timeoutSeconds ) { // 带超时的连接 ExecutorService executor = Executors.newSingleThreadExecutor(); try { Future<?> future = executor.submit(() -> { try { sdk.connect(terminalId); } catch (Exception e) { throw new RuntimeException(e); } }); future.get(timeoutSeconds, TimeUnit.SECONDS); System.out.println("连接成功"); return true; } catch (TimeoutException e) { System.err.println("连接超时(" + timeoutSeconds + "秒)"); return false; } catch (Exception e) { System.err.println("连接失败: " + e.getMessage()); return false; } finally { executor.shutdown(); } } public static boolean sendWithTimeout( SDK sdk, String terminalId, Signal signal, int timeoutSeconds ) { // 带超时的发送 ExecutorService executor = Executors.newSingleThreadExecutor(); try { Future<?> future = executor.submit(() -> { try { sdk.sendSignal(terminalId, signal); } catch (Exception e) { throw new RuntimeException(e); } }); future.get(timeoutSeconds, TimeUnit.SECONDS); System.out.println("发送成功"); return true; } catch (TimeoutException e) { System.err.println("发送超时(" + timeoutSeconds + "秒)"); return false; } catch (Exception e) { System.err.println("发送失败: " + e.getMessage()); return false; } finally { executor.shutdown(); } } }

配置验证

public class ConfigValidator { public static boolean validate(Config config) { // 验证配置 if (config.getDeviceId() == null || config.getDeviceId().isEmpty()) { System.err.println("错误:deviceId 不能为空"); return false; } if (config.getDeviceType() == null || config.getDeviceType().isEmpty()) { System.err.println("错误:deviceType 不能为空"); return false; } if (config.getEndpoint() == null || !config.getEndpoint().startsWith("wss://")) { System.err.println("错误:endpoint 必须使用 wss:// 协议"); return false; } if (config.getTenantId() == null || config.getTenantId().isEmpty()) { System.err.println("错误:tenantId 不能为空"); return false; } if (config.getProductId() == null || config.getProductId().isEmpty()) { System.err.println("错误:productId 不能为空"); return false; } if (config.getProductKey() == null || config.getProductKey().isEmpty()) { System.err.println("错误:productKey 不能为空"); return false; } return true; } // 使用 public static void main(String[] args) { Config config = new Config.Builder()...build(); if (ConfigValidator.validate(config)) { SDK sdk = new SDK(); sdk.registerTerminal("terminal-1", config); } else { System.err.println("配置无效"); } } }

数据验证

public class SensorDataValidator { // 传感器数据验证器 public static void validateTemperature(double temperature) throws IllegalArgumentException { // 验证温度值 if (temperature < -50 || temperature > 150) { throw new IllegalArgumentException("温度值超出范围: " + temperature + "°C"); } } public static Signal createValidatedSignal(double temperature) throws IllegalArgumentException { // 创建经过验证的温度信号 validateTemperature(temperature); Signal signal = new Signal("sensor.temperature"); signal.getPayload().setNumber("value", temperature); signal.getPayload().setString("unit", "celsius"); return signal; } // 使用 public static void main(String[] args) { SDK sdk = new SDK(); // ... 配置和连接 ... try { Signal signal = SensorDataValidator.createValidatedSignal(25.5); sdk.sendSignal("terminal-1", signal); } catch (IllegalArgumentException e) { System.err.println("验证错误: " + e.getMessage()); } catch (Exception e) { e.printStackTrace(); } } }

日志记录

使用 SLF4J 和 Logback 进行日志记录:

import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class LoggingExample { private static final Logger logger = LoggerFactory.getLogger(LoggingExample.class); public static void main(String[] args) { try { SDK sdk = new SDK(); Config config = new Config.Builder()...build(); sdk.registerTerminal("terminal-1", config); logger.info("开始连接到 StreamInd 平台"); sdk.connect("terminal-1"); logger.info("连接成功"); Signal signal = new Signal("sensor.data"); signal.getPayload().setNumber("value", 25.5); logger.info("发送传感器数据"); sdk.sendSignal("terminal-1", signal); logger.info("数据发送成功"); Thread.sleep(60000); logger.info("断开连接"); sdk.disconnect("terminal-1"); logger.info("已断开连接"); } catch (Exception e) { logger.error("应用程序错误", e); } } }

logback.xml 配置

<configuration> <appender name="FILE" class="ch.qos.logback.core.FileAppender"> <file>streamind-sdk.log</file> <encoder> <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> </appender> <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%d{HH:mm:ss.SSS} %-5level %logger{36} - %msg%n</pattern> </encoder> </appender> <root level="INFO"> <appender-ref ref="FILE" /> <appender-ref ref="CONSOLE" /> </root> </configuration>

优雅退出

public class GracefulShutdown { private SDK sdk; private volatile boolean running = true; public void start() { // 启动应用 Config config = new Config.Builder()...build(); sdk = new SDK(); sdk.registerTerminal("terminal-1", config); try { // 连接 sdk.connect("terminal-1"); System.out.println("应用已启动"); // 主循环 while (running) { try { // 应用逻辑... Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } catch (Exception e) { System.err.println("运行时错误: " + e.getMessage()); } } // 清理 cleanup(); } catch (Exception e) { e.printStackTrace(); } } private void cleanup() { // 清理资源 System.out.println("正在清理资源..."); try { sdk.disconnectAll(); System.out.println("资源清理完成"); } catch (Exception e) { System.err.println("清理时出错: " + e.getMessage()); } } public void stop() { // 停止应用 System.out.println("收到停止信号"); running = false; } public static void main(String[] args) { GracefulShutdown app = new GracefulShutdown(); // 添加关闭钩子 Runtime.getRuntime().addShutdownHook(new Thread(() -> { app.stop(); try { Thread.sleep(1000); // 给应用时间来清理 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } })); app.start(); } }

最佳实践

  1. 异常捕获 - 捕获所有可能的异常,避免程序崩溃
  2. 重试机制 - 网络操作使用指数退避重试
  3. 超时设置 - 为所有阻塞操作设置合理超时
  4. 日志记录 - 记录所有关键操作和错误
  5. 优雅退出 - 确保资源正确释放
  6. 数据验证 - 发送前验证数据有效性
  7. 线程安全 - 多线程环境下注意同步

相关文档

Last updated on