错误处理
完善的错误处理策略,确保应用稳定可靠。
基础错误处理
import { SDK, Config, Signal } from 'streamind-sdk';
async function main() {
const sdk = new SDK();
const config = new Config({
// ... 配置参数
});
sdk.registerTerminal("terminal-1", config);
// 连接错误处理
try {
await sdk.connect("terminal-1");
console.log("连接成功");
} catch (error) {
if (error instanceof Error) {
console.error(`连接失败: ${error.message}`);
}
return;
}
// 发送信号错误处理
const signal = new Signal("sensor.data");
signal.getPayload().setNumber("value", 25.5);
try {
await sdk.sendSignal("terminal-1", signal);
console.log("信号发送成功");
} catch (error) {
console.error(`发送失败: ${error}`);
}
// 断开连接
try {
await sdk.disconnect("terminal-1");
} catch (error) {
console.error(`断开连接时出错: ${error}`);
}
}
main().catch(console.error);重试机制
连接重试
import { SDK, Config } from 'streamind-sdk';
async function connectWithRetry(
sdk: SDK,
terminalId: string,
maxRetries: number = 3,
retryDelay: number = 5000
): Promise<boolean> {
// 带重试的连接
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
console.log(`连接尝试 ${attempt + 1}/${maxRetries}...`);
await sdk.connect(terminalId);
console.log("连接成功");
return true;
} catch (error) {
console.error(`连接失败: ${error}`);
if (attempt < maxRetries - 1) {
console.log(`等待 ${retryDelay / 1000} 秒后重试...`);
await new Promise(resolve => setTimeout(resolve, retryDelay));
} else {
console.log("达到最大重试次数,连接失败");
return false;
}
}
}
return false;
}
async function main() {
const sdk = new SDK();
const config = new Config({
// ... 配置参数
});
sdk.registerTerminal("terminal-1", config);
// 带重试的连接
const success = await connectWithRetry(sdk, "terminal-1", 5);
if (success) {
// 继续后续操作
await new Promise(resolve => setTimeout(resolve, 60000));
await sdk.disconnect("terminal-1");
} else {
console.log("无法建立连接");
}
}
main().catch(console.error);发送重试
import { SDK, Signal } from 'streamind-sdk';
async function sendWithRetry(
sdk: SDK,
terminalId: string,
signal: Signal,
maxRetries: number = 3
): Promise<boolean> {
// 带重试的信号发送
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
await sdk.sendSignal(terminalId, signal);
console.log(`信号发送成功(尝试 ${attempt + 1})`);
return true;
} catch (error) {
console.error(`发送失败(尝试 ${attempt + 1}): ${error}`);
if (attempt < maxRetries - 1) {
await new Promise(resolve => setTimeout(resolve, 1000));
} else {
console.log("达到最大重试次数,发送失败");
return false;
}
}
}
return false;
}
// 使用
const signal = new Signal("sensor.data");
signal.getPayload().setNumber("value", 25.5);
const success = await sendWithRetry(sdk, "terminal-1", signal, 5);指数退避重试
import { SDK, Signal } from 'streamind-sdk';
async function sendWithExponentialBackoff(
sdk: SDK,
terminalId: string,
signal: Signal,
maxRetries: number = 5
): Promise<boolean> {
// 指数退避重试
const baseDelay = 1000; // 基础延迟(毫秒)
const maxDelay = 60000; // 最大延迟(毫秒)
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
await sdk.sendSignal(terminalId, signal);
console.log(`发送成功(尝试 ${attempt + 1})`);
return true;
} catch (error) {
console.error(`发送失败(尝试 ${attempt + 1}): ${error}`);
if (attempt < maxRetries - 1) {
// 计算延迟时间:2^attempt * baseDelay + 随机抖动
const delay = Math.min(baseDelay * Math.pow(2, attempt), maxDelay);
const jitter = Math.random() * 0.1 * delay; // 10% 抖动
const totalDelay = delay + jitter;
console.log(`等待 ${(totalDelay / 1000).toFixed(2)} 秒后重试...`);
await new Promise(resolve => setTimeout(resolve, totalDelay));
} else {
console.log("达到最大重试次数");
return false;
}
}
}
return false;
}
// 使用
const signal = new Signal("sensor.data");
const success = await sendWithExponentialBackoff(sdk, "terminal-1", signal);超时处理
import { SDK, Signal } from 'streamind-sdk';
async function connectWithTimeout(
sdk: SDK,
terminalId: string,
timeout: number = 10000
): Promise<boolean> {
// 带超时的连接
try {
await Promise.race([
sdk.connect(terminalId),
new Promise((_, reject) =>
setTimeout(() => reject(new Error("Connection timeout")), timeout)
)
]);
console.log("连接成功");
return true;
} catch (error) {
if (error instanceof Error && error.message === "Connection timeout") {
console.error(`连接超时(${timeout / 1000}秒)`);
} else {
console.error(`连接失败: ${error}`);
}
return false;
}
}
async function sendWithTimeout(
sdk: SDK,
terminalId: string,
signal: Signal,
timeout: number = 5000
): Promise<boolean> {
// 带超时的发送
try {
await Promise.race([
sdk.sendSignal(terminalId, signal),
new Promise((_, reject) =>
setTimeout(() => reject(new Error("Send timeout")), timeout)
)
]);
console.log("发送成功");
return true;
} catch (error) {
if (error instanceof Error && error.message === "Send timeout") {
console.error(`发送超时(${timeout / 1000}秒)`);
} else {
console.error(`发送失败: ${error}`);
}
return false;
}
}
// 使用
await connectWithTimeout(sdk, "terminal-1", 15000);
const signal = new Signal("sensor.data");
await sendWithTimeout(sdk, "terminal-1", signal, 3000);连接状态监控
import { SDK, Config } from 'streamind-sdk';
class ConnectionMonitor {
private sdk: SDK;
private terminalId: string;
private isConnected: boolean = false;
private reconnectAttempts: number = 0;
private maxReconnectAttempts: number = 10;
constructor(sdk: SDK, terminalId: string) {
this.sdk = sdk;
this.terminalId = terminalId;
}
setup(): void {
// 设置连接监控
this.sdk.setConnectionCallback(this.terminalId, (status, message) => {
if (status === "connected") {
this.isConnected = true;
this.reconnectAttempts = 0;
console.log("连接已建立");
} else if (status === "disconnected") {
this.isConnected = false;
console.log(`连接断开: ${message}`);
// 触发重连
this.autoReconnect();
} else if (status === "error") {
console.error(`连接错误: ${message}`);
}
});
}
async autoReconnect(): Promise<void> {
// 自动重连
while (!this.isConnected && this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
console.log(`重连尝试 ${this.reconnectAttempts}/${this.maxReconnectAttempts}`);
try {
await new Promise(resolve => setTimeout(resolve, 5000)); // 等待5秒
await this.sdk.connect(this.terminalId);
} catch (error) {
console.error(`重连失败: ${error}`);
}
}
if (!this.isConnected) {
console.log("达到最大重连次数,放弃重连");
}
}
}
// 使用
async function main() {
const sdk = new SDK();
const config = new Config({
// ... 配置参数
});
sdk.registerTerminal("terminal-1", config);
// 设置监控
const monitor = new ConnectionMonitor(sdk, "terminal-1");
monitor.setup();
// 初始连接
await sdk.connect("terminal-1");
// 保持运行
await new Promise(resolve => setTimeout(resolve, 3600000));
}
main().catch(console.error);批量操作错误处理
import { SDK, Signal } from 'streamind-sdk';
interface SendResult {
success: boolean;
error?: string;
}
async function sendToMultipleTerminals(
sdk: SDK,
terminalIds: string[],
signal: Signal
): Promise<Map<string, SendResult>> {
// 向多个终端发送,处理部分失败
const results = new Map<string, SendResult>();
const tasks = terminalIds.map(async (terminalId) => {
try {
await sdk.sendSignal(terminalId, signal);
results.set(terminalId, { success: true });
console.log(`${terminalId}: 发送成功`);
} catch (error) {
const errorMsg = error instanceof Error ? error.message : String(error);
results.set(terminalId, { success: false, error: errorMsg });
console.error(`${terminalId}: 发送失败 - ${errorMsg}`);
}
});
await Promise.all(tasks);
return results;
}
// 使用
const signal = new Signal("sensor.data");
signal.getPayload().setNumber("value", 25.5);
const terminalIds = ["terminal-1", "terminal-2", "terminal-3"];
const results = await sendToMultipleTerminals(sdk, terminalIds, signal);
// 统计
const successCount = Array.from(results.values()).filter(r => r.success).length;
const totalCount = results.size;
console.log(`\n发送完成: ${successCount}/${totalCount} 成功`);配置验证
import { Config } from 'streamind-sdk';
function validateConfig(config: Config): { valid: boolean; message: string } {
// 验证配置
if (!config.deviceId) {
return { valid: false, message: "deviceId 不能为空" };
}
if (!config.deviceType) {
return { valid: false, message: "deviceType 不能为空" };
}
if (!config.endpoint) {
return { valid: false, message: "endpoint 不能为空" };
}
if (!config.endpoint.startsWith("wss://")) {
return { valid: false, message: "endpoint 必须使用 wss:// 协议" };
}
if (!config.tenantId) {
return { valid: false, message: "tenantId 不能为空" };
}
if (!config.productId) {
return { valid: false, message: "productId 不能为空" };
}
if (!config.productKey) {
return { valid: false, message: "productKey 不能为空" };
}
return { valid: true, message: "配置有效" };
}
// 使用
const config = new Config({
// ... 配置参数
});
const result = validateConfig(config);
if (result.valid) {
sdk.registerTerminal("terminal-1", config);
} else {
console.error(`配置无效: ${result.message}`);
}数据验证
import { Signal } from 'streamind-sdk';
class SensorDataValidator {
// 传感器数据验证器
static validateTemperature(value: number): { valid: boolean; message: string } {
// 验证温度值
if (typeof value !== 'number') {
return { valid: false, message: "温度必须是数字" };
}
if (value < -50 || value > 150) {
return { valid: false, message: `温度超出范围: ${value}°C` };
}
return { valid: true, message: "有效" };
}
static createValidatedSignal(temperature: number): Signal {
// 创建经过验证的温度信号
const result = this.validateTemperature(temperature);
if (!result.valid) {
throw new Error(`数据验证失败: ${result.message}`);
}
const signal = new Signal("sensor.temperature");
signal.getPayload().setNumber("value", temperature);
signal.getPayload().setString("unit", "celsius");
return signal;
}
}
// 使用
try {
const signal = SensorDataValidator.createValidatedSignal(25.5);
await sdk.sendSignal("terminal-1", signal);
} catch (error) {
if (error instanceof Error) {
console.error(`验证错误: ${error.message}`);
}
}日志记录
import { SDK, Config, Signal } from 'streamind-sdk';
import * as winston from 'winston';
// 配置日志
const logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json()
),
transports: [
new winston.transports.File({ filename: 'error.log', level: 'error' }),
new winston.transports.File({ filename: 'combined.log' }),
new winston.transports.Console({
format: winston.format.simple()
})
]
});
async function main() {
try {
const sdk = new SDK();
const config = new Config({
// ... 配置参数
});
sdk.registerTerminal("terminal-1", config);
logger.info("开始连接到 StreamInd 平台");
await sdk.connect("terminal-1");
logger.info("连接成功");
const signal = new Signal("sensor.data");
signal.getPayload().setNumber("value", 25.5);
logger.info("发送传感器数据");
await sdk.sendSignal("terminal-1", signal);
logger.info("数据发送成功");
await new Promise(resolve => setTimeout(resolve, 60000));
logger.info("断开连接");
await sdk.disconnect("terminal-1");
logger.info("已断开连接");
} catch (error) {
logger.error("应用程序错误", { error });
throw error;
}
}
main().catch(error => {
logger.error("未处理的错误", { error });
process.exit(1);
});优雅退出
import { SDK, Config } from 'streamind-sdk';
class Application {
private sdk: SDK;
private running: boolean = true;
constructor() {
this.sdk = new SDK();
}
async start(): Promise<void> {
// 启动应用
const config = new Config({
// ... 配置参数
});
this.sdk.registerTerminal("terminal-1", config);
// 连接
await this.sdk.connect("terminal-1");
console.log("应用已启动");
// 主循环
while (this.running) {
try {
// 应用逻辑...
await new Promise(resolve => setTimeout(resolve, 1000));
} catch (error) {
console.error(`运行时错误: ${error}`);
}
}
// 清理
await this.cleanup();
}
async cleanup(): Promise<void> {
// 清理资源
console.log("正在清理资源...");
try {
await this.sdk.disconnectAll();
console.log("资源清理完成");
} catch (error) {
console.error(`清理时出错: ${error}`);
}
}
stop(): void {
// 停止应用
console.log("收到停止信号");
this.running = false;
}
}
// 使用
async function main() {
const app = new Application();
// 设置信号处理
process.on('SIGINT', () => {
app.stop();
});
process.on('SIGTERM', () => {
app.stop();
});
try {
await app.start();
} catch (error) {
console.error(`应用错误: ${error}`);
} finally {
console.log("应用已退出");
process.exit(0);
}
}
main();最佳实践
- 异常捕获 - 捕获所有可能的异常,避免程序崩溃
- 重试机制 - 网络操作使用指数退避重试
- 超时设置 - 为所有异步操作设置合理超时
- 日志记录 - 记录所有关键操作和错误
- 优雅退出 - 确保资源正确释放
- 数据验证 - 发送前验证数据有效性
- 状态监控 - 实时监控连接状态并自动恢复
相关文档
Last updated on