Skip to Content

错误处理

完善的错误处理策略,确保应用稳定可靠。

基础错误处理

import { SDK, Config, Signal } from 'streamind-sdk'; async function main() { const sdk = new SDK(); const config = new Config({ // ... 配置参数 }); sdk.registerTerminal("terminal-1", config); // 连接错误处理 try { await sdk.connect("terminal-1"); console.log("连接成功"); } catch (error) { if (error instanceof Error) { console.error(`连接失败: ${error.message}`); } return; } // 发送信号错误处理 const signal = new Signal("sensor.data"); signal.getPayload().setNumber("value", 25.5); try { await sdk.sendSignal("terminal-1", signal); console.log("信号发送成功"); } catch (error) { console.error(`发送失败: ${error}`); } // 断开连接 try { await sdk.disconnect("terminal-1"); } catch (error) { console.error(`断开连接时出错: ${error}`); } } main().catch(console.error);

重试机制

连接重试

import { SDK, Config } from 'streamind-sdk'; async function connectWithRetry( sdk: SDK, terminalId: string, maxRetries: number = 3, retryDelay: number = 5000 ): Promise<boolean> { // 带重试的连接 for (let attempt = 0; attempt < maxRetries; attempt++) { try { console.log(`连接尝试 ${attempt + 1}/${maxRetries}...`); await sdk.connect(terminalId); console.log("连接成功"); return true; } catch (error) { console.error(`连接失败: ${error}`); if (attempt < maxRetries - 1) { console.log(`等待 ${retryDelay / 1000} 秒后重试...`); await new Promise(resolve => setTimeout(resolve, retryDelay)); } else { console.log("达到最大重试次数,连接失败"); return false; } } } return false; } async function main() { const sdk = new SDK(); const config = new Config({ // ... 配置参数 }); sdk.registerTerminal("terminal-1", config); // 带重试的连接 const success = await connectWithRetry(sdk, "terminal-1", 5); if (success) { // 继续后续操作 await new Promise(resolve => setTimeout(resolve, 60000)); await sdk.disconnect("terminal-1"); } else { console.log("无法建立连接"); } } main().catch(console.error);

发送重试

import { SDK, Signal } from 'streamind-sdk'; async function sendWithRetry( sdk: SDK, terminalId: string, signal: Signal, maxRetries: number = 3 ): Promise<boolean> { // 带重试的信号发送 for (let attempt = 0; attempt < maxRetries; attempt++) { try { await sdk.sendSignal(terminalId, signal); console.log(`信号发送成功(尝试 ${attempt + 1})`); return true; } catch (error) { console.error(`发送失败(尝试 ${attempt + 1}): ${error}`); if (attempt < maxRetries - 1) { await new Promise(resolve => setTimeout(resolve, 1000)); } else { console.log("达到最大重试次数,发送失败"); return false; } } } return false; } // 使用 const signal = new Signal("sensor.data"); signal.getPayload().setNumber("value", 25.5); const success = await sendWithRetry(sdk, "terminal-1", signal, 5);

指数退避重试

import { SDK, Signal } from 'streamind-sdk'; async function sendWithExponentialBackoff( sdk: SDK, terminalId: string, signal: Signal, maxRetries: number = 5 ): Promise<boolean> { // 指数退避重试 const baseDelay = 1000; // 基础延迟(毫秒) const maxDelay = 60000; // 最大延迟(毫秒) for (let attempt = 0; attempt < maxRetries; attempt++) { try { await sdk.sendSignal(terminalId, signal); console.log(`发送成功(尝试 ${attempt + 1})`); return true; } catch (error) { console.error(`发送失败(尝试 ${attempt + 1}): ${error}`); if (attempt < maxRetries - 1) { // 计算延迟时间:2^attempt * baseDelay + 随机抖动 const delay = Math.min(baseDelay * Math.pow(2, attempt), maxDelay); const jitter = Math.random() * 0.1 * delay; // 10% 抖动 const totalDelay = delay + jitter; console.log(`等待 ${(totalDelay / 1000).toFixed(2)} 秒后重试...`); await new Promise(resolve => setTimeout(resolve, totalDelay)); } else { console.log("达到最大重试次数"); return false; } } } return false; } // 使用 const signal = new Signal("sensor.data"); const success = await sendWithExponentialBackoff(sdk, "terminal-1", signal);

超时处理

import { SDK, Signal } from 'streamind-sdk'; async function connectWithTimeout( sdk: SDK, terminalId: string, timeout: number = 10000 ): Promise<boolean> { // 带超时的连接 try { await Promise.race([ sdk.connect(terminalId), new Promise((_, reject) => setTimeout(() => reject(new Error("Connection timeout")), timeout) ) ]); console.log("连接成功"); return true; } catch (error) { if (error instanceof Error && error.message === "Connection timeout") { console.error(`连接超时(${timeout / 1000}秒)`); } else { console.error(`连接失败: ${error}`); } return false; } } async function sendWithTimeout( sdk: SDK, terminalId: string, signal: Signal, timeout: number = 5000 ): Promise<boolean> { // 带超时的发送 try { await Promise.race([ sdk.sendSignal(terminalId, signal), new Promise((_, reject) => setTimeout(() => reject(new Error("Send timeout")), timeout) ) ]); console.log("发送成功"); return true; } catch (error) { if (error instanceof Error && error.message === "Send timeout") { console.error(`发送超时(${timeout / 1000}秒)`); } else { console.error(`发送失败: ${error}`); } return false; } } // 使用 await connectWithTimeout(sdk, "terminal-1", 15000); const signal = new Signal("sensor.data"); await sendWithTimeout(sdk, "terminal-1", signal, 3000);

连接状态监控

import { SDK, Config } from 'streamind-sdk'; class ConnectionMonitor { private sdk: SDK; private terminalId: string; private isConnected: boolean = false; private reconnectAttempts: number = 0; private maxReconnectAttempts: number = 10; constructor(sdk: SDK, terminalId: string) { this.sdk = sdk; this.terminalId = terminalId; } setup(): void { // 设置连接监控 this.sdk.setConnectionCallback(this.terminalId, (status, message) => { if (status === "connected") { this.isConnected = true; this.reconnectAttempts = 0; console.log("连接已建立"); } else if (status === "disconnected") { this.isConnected = false; console.log(`连接断开: ${message}`); // 触发重连 this.autoReconnect(); } else if (status === "error") { console.error(`连接错误: ${message}`); } }); } async autoReconnect(): Promise<void> { // 自动重连 while (!this.isConnected && this.reconnectAttempts < this.maxReconnectAttempts) { this.reconnectAttempts++; console.log(`重连尝试 ${this.reconnectAttempts}/${this.maxReconnectAttempts}`); try { await new Promise(resolve => setTimeout(resolve, 5000)); // 等待5秒 await this.sdk.connect(this.terminalId); } catch (error) { console.error(`重连失败: ${error}`); } } if (!this.isConnected) { console.log("达到最大重连次数,放弃重连"); } } } // 使用 async function main() { const sdk = new SDK(); const config = new Config({ // ... 配置参数 }); sdk.registerTerminal("terminal-1", config); // 设置监控 const monitor = new ConnectionMonitor(sdk, "terminal-1"); monitor.setup(); // 初始连接 await sdk.connect("terminal-1"); // 保持运行 await new Promise(resolve => setTimeout(resolve, 3600000)); } main().catch(console.error);

批量操作错误处理

import { SDK, Signal } from 'streamind-sdk'; interface SendResult { success: boolean; error?: string; } async function sendToMultipleTerminals( sdk: SDK, terminalIds: string[], signal: Signal ): Promise<Map<string, SendResult>> { // 向多个终端发送,处理部分失败 const results = new Map<string, SendResult>(); const tasks = terminalIds.map(async (terminalId) => { try { await sdk.sendSignal(terminalId, signal); results.set(terminalId, { success: true }); console.log(`${terminalId}: 发送成功`); } catch (error) { const errorMsg = error instanceof Error ? error.message : String(error); results.set(terminalId, { success: false, error: errorMsg }); console.error(`${terminalId}: 发送失败 - ${errorMsg}`); } }); await Promise.all(tasks); return results; } // 使用 const signal = new Signal("sensor.data"); signal.getPayload().setNumber("value", 25.5); const terminalIds = ["terminal-1", "terminal-2", "terminal-3"]; const results = await sendToMultipleTerminals(sdk, terminalIds, signal); // 统计 const successCount = Array.from(results.values()).filter(r => r.success).length; const totalCount = results.size; console.log(`\n发送完成: ${successCount}/${totalCount} 成功`);

配置验证

import { Config } from 'streamind-sdk'; function validateConfig(config: Config): { valid: boolean; message: string } { // 验证配置 if (!config.deviceId) { return { valid: false, message: "deviceId 不能为空" }; } if (!config.deviceType) { return { valid: false, message: "deviceType 不能为空" }; } if (!config.endpoint) { return { valid: false, message: "endpoint 不能为空" }; } if (!config.endpoint.startsWith("wss://")) { return { valid: false, message: "endpoint 必须使用 wss:// 协议" }; } if (!config.tenantId) { return { valid: false, message: "tenantId 不能为空" }; } if (!config.productId) { return { valid: false, message: "productId 不能为空" }; } if (!config.productKey) { return { valid: false, message: "productKey 不能为空" }; } return { valid: true, message: "配置有效" }; } // 使用 const config = new Config({ // ... 配置参数 }); const result = validateConfig(config); if (result.valid) { sdk.registerTerminal("terminal-1", config); } else { console.error(`配置无效: ${result.message}`); }

数据验证

import { Signal } from 'streamind-sdk'; class SensorDataValidator { // 传感器数据验证器 static validateTemperature(value: number): { valid: boolean; message: string } { // 验证温度值 if (typeof value !== 'number') { return { valid: false, message: "温度必须是数字" }; } if (value < -50 || value > 150) { return { valid: false, message: `温度超出范围: ${value}°C` }; } return { valid: true, message: "有效" }; } static createValidatedSignal(temperature: number): Signal { // 创建经过验证的温度信号 const result = this.validateTemperature(temperature); if (!result.valid) { throw new Error(`数据验证失败: ${result.message}`); } const signal = new Signal("sensor.temperature"); signal.getPayload().setNumber("value", temperature); signal.getPayload().setString("unit", "celsius"); return signal; } } // 使用 try { const signal = SensorDataValidator.createValidatedSignal(25.5); await sdk.sendSignal("terminal-1", signal); } catch (error) { if (error instanceof Error) { console.error(`验证错误: ${error.message}`); } }

日志记录

import { SDK, Config, Signal } from 'streamind-sdk'; import * as winston from 'winston'; // 配置日志 const logger = winston.createLogger({ level: 'info', format: winston.format.combine( winston.format.timestamp(), winston.format.json() ), transports: [ new winston.transports.File({ filename: 'error.log', level: 'error' }), new winston.transports.File({ filename: 'combined.log' }), new winston.transports.Console({ format: winston.format.simple() }) ] }); async function main() { try { const sdk = new SDK(); const config = new Config({ // ... 配置参数 }); sdk.registerTerminal("terminal-1", config); logger.info("开始连接到 StreamInd 平台"); await sdk.connect("terminal-1"); logger.info("连接成功"); const signal = new Signal("sensor.data"); signal.getPayload().setNumber("value", 25.5); logger.info("发送传感器数据"); await sdk.sendSignal("terminal-1", signal); logger.info("数据发送成功"); await new Promise(resolve => setTimeout(resolve, 60000)); logger.info("断开连接"); await sdk.disconnect("terminal-1"); logger.info("已断开连接"); } catch (error) { logger.error("应用程序错误", { error }); throw error; } } main().catch(error => { logger.error("未处理的错误", { error }); process.exit(1); });

优雅退出

import { SDK, Config } from 'streamind-sdk'; class Application { private sdk: SDK; private running: boolean = true; constructor() { this.sdk = new SDK(); } async start(): Promise<void> { // 启动应用 const config = new Config({ // ... 配置参数 }); this.sdk.registerTerminal("terminal-1", config); // 连接 await this.sdk.connect("terminal-1"); console.log("应用已启动"); // 主循环 while (this.running) { try { // 应用逻辑... await new Promise(resolve => setTimeout(resolve, 1000)); } catch (error) { console.error(`运行时错误: ${error}`); } } // 清理 await this.cleanup(); } async cleanup(): Promise<void> { // 清理资源 console.log("正在清理资源..."); try { await this.sdk.disconnectAll(); console.log("资源清理完成"); } catch (error) { console.error(`清理时出错: ${error}`); } } stop(): void { // 停止应用 console.log("收到停止信号"); this.running = false; } } // 使用 async function main() { const app = new Application(); // 设置信号处理 process.on('SIGINT', () => { app.stop(); }); process.on('SIGTERM', () => { app.stop(); }); try { await app.start(); } catch (error) { console.error(`应用错误: ${error}`); } finally { console.log("应用已退出"); process.exit(0); } } main();

最佳实践

  1. 异常捕获 - 捕获所有可能的异常,避免程序崩溃
  2. 重试机制 - 网络操作使用指数退避重试
  3. 超时设置 - 为所有异步操作设置合理超时
  4. 日志记录 - 记录所有关键操作和错误
  5. 优雅退出 - 确保资源正确释放
  6. 数据验证 - 发送前验证数据有效性
  7. 状态监控 - 实时监控连接状态并自动恢复

相关文档

Last updated on