Skip to Content

SDK 类

SDK 类是 StreamInd Python SDK 的核心入口,用于管理终端连接、发送信号和处理指令。

导入

from streamind_sdk import SDK

构造函数

sdk = SDK()

创建一个新的 SDK 实例。不需要传入任何参数。

方法

register_terminal

sdk.register_terminal(terminal_id: str, config: Config) -> None

注册一个新的终端。

参数

  • terminal_id (str) - 终端的唯一标识符
  • config (Config) - 终端的配置对象

示例

config = Config(...) sdk.register_terminal("terminal-1", config)

connect

await sdk.connect(terminal_id: str) -> None

异步连接指定的终端到 StreamInd 平台。

参数

  • terminal_id (str) - 要连接的终端ID

返回

  • 无返回值,连接成功时返回,失败时抛出异常

异常

  • Exception - 连接失败时抛出

示例

try: await sdk.connect("terminal-1") print("连接成功") except Exception as e: print(f"连接失败: {e}")

connect_all

await sdk.connect_all() -> Dict[str, bool]

异步连接所有已注册的终端。

返回

  • Dict[str, bool] - 字典,键为终端ID,值为连接是否成功

示例

results = await sdk.connect_all() for terminal_id, success in results.items(): print(f"{terminal_id}: {'成功' if success else '失败'}")

disconnect

await sdk.disconnect(terminal_id: str) -> None

异步断开指定终端的连接。

参数

  • terminal_id (str) - 要断开的终端ID

示例

await sdk.disconnect("terminal-1")

disconnect_all

await sdk.disconnect_all() -> None

异步断开所有终端的连接。

示例

await sdk.disconnect_all()

send_signal

await sdk.send_signal(terminal_id: str, signal: Signal) -> None

向指定终端发送信号。

参数

  • terminal_id (str) - 目标终端ID
  • signal (Signal) - 要发送的信号对象

异常

  • Exception - 发送失败时抛出

示例

signal = Signal("sensor.data") signal.get_payload().set_number("temperature", 25.5) await sdk.send_signal("terminal-1", signal)

send_audio_data

await sdk.send_audio_data(terminal_id: str, audio_data: bytes) -> None

向指定终端发送 OPUS 格式的音频数据。

参数

  • terminal_id (str) - 目标终端ID
  • audio_data (bytes) - OPUS 格式的音频二进制数据

异常

  • Exception - 发送失败时抛出

示例

with open("audio.opus", "rb") as f: audio_data = f.read() await sdk.send_audio_data("terminal-1", audio_data)

set_directive_callback

sdk.set_directive_callback(terminal_id: str, callback: Callable[[Directive], None]) -> None

设置接收指令的回调函数。

参数

  • terminal_id (str) - 终端ID
  • callback (Callable) - 回调函数,接收一个 Directive 参数

回调函数签名

def callback(directive: Directive) -> None: pass

示例

def on_directive(directive): print(f"收到指令: {directive.name}") payload = directive.get_payload() # 处理指令... sdk.set_directive_callback("terminal-1", on_directive)

异步回调

async def on_directive(directive): # 异步处理... await some_async_operation() sdk.set_directive_callback("terminal-1", on_directive)

set_connection_callback

sdk.set_connection_callback( terminal_id: str, callback: Callable[[str, str], None] ) -> None

设置连接状态变化的回调函数。

参数

  • terminal_id (str) - 终端ID
  • callback (Callable) - 回调函数,接收状态和消息两个参数

回调函数签名

def callback(status: str, message: str) -> None: pass

状态值

  • "connected" - 已连接
  • "disconnected" - 已断开
  • "reconnecting" - 重连中
  • "error" - 错误

示例

def on_connection(status, message): if status == "connected": print("连接成功") elif status == "disconnected": print(f"连接断开: {message}") elif status == "reconnecting": print(f"正在重连: {message}") elif status == "error": print(f"连接错误: {message}") sdk.set_connection_callback("terminal-1", on_connection)

完整示例

import asyncio from streamind_sdk import SDK, Config, Signal async def main(): # 创建SDK sdk = SDK() # 创建配置 config = Config( device_id="device-001", device_type="sensor", endpoint="wss://your-platform.com/signals", tenant_id="your-tenant-id", product_id="your-product-id", product_key="your-secret-key" ) # 注册终端 sdk.register_terminal("terminal-1", config) # 设置回调 def on_directive(directive): print(f"指令: {directive.name}") def on_connection(status, message): print(f"状态: {status}, {message}") sdk.set_directive_callback("terminal-1", on_directive) sdk.set_connection_callback("terminal-1", on_connection) # 连接 await sdk.connect("terminal-1") # 发送信号 signal = Signal("sensor.data") signal.get_payload().set_number("value", 25.5) await sdk.send_signal("terminal-1", signal) # 发送音频 with open("audio.opus", "rb") as f: await sdk.send_audio_data("terminal-1", f.read()) # 保持连接 await asyncio.sleep(60) # 断开连接 await sdk.disconnect("terminal-1") asyncio.run(main())

线程安全

SDK 类不是线程安全的。如果需要在多线程环境中使用,请为每个线程创建独立的 SDK 实例,或使用适当的同步机制。

最佳实践

  1. 单一 SDK 实例 - 在应用中使用单个 SDK 实例管理所有终端
  2. 错误处理 - 始终使用 try-except 包裹连接和发送操作
  3. 优雅关闭 - 应用退出前调用 disconnect_all()
  4. 回调轻量 - 回调函数中避免阻塞操作,使用异步处理
  5. 资源管理 - 使用 asyncio 的上下文管理器管理连接生命周期
Last updated on