SDK 类
SDK 类是 StreamInd Python SDK 的核心入口,用于管理终端连接、发送信号和处理指令。
导入
from streamind_sdk import SDK构造函数
sdk = SDK()创建一个新的 SDK 实例。不需要传入任何参数。
方法
register_terminal
sdk.register_terminal(terminal_id: str, config: Config) -> None注册一个新的终端。
参数:
terminal_id(str) - 终端的唯一标识符config(Config) - 终端的配置对象
示例:
config = Config(...)
sdk.register_terminal("terminal-1", config)connect
await sdk.connect(terminal_id: str) -> None异步连接指定的终端到 StreamInd 平台。
参数:
terminal_id(str) - 要连接的终端ID
返回:
- 无返回值,连接成功时返回,失败时抛出异常
异常:
Exception- 连接失败时抛出
示例:
try:
await sdk.connect("terminal-1")
print("连接成功")
except Exception as e:
print(f"连接失败: {e}")connect_all
await sdk.connect_all() -> Dict[str, bool]异步连接所有已注册的终端。
返回:
Dict[str, bool]- 字典,键为终端ID,值为连接是否成功
示例:
results = await sdk.connect_all()
for terminal_id, success in results.items():
print(f"{terminal_id}: {'成功' if success else '失败'}")disconnect
await sdk.disconnect(terminal_id: str) -> None异步断开指定终端的连接。
参数:
terminal_id(str) - 要断开的终端ID
示例:
await sdk.disconnect("terminal-1")disconnect_all
await sdk.disconnect_all() -> None异步断开所有终端的连接。
示例:
await sdk.disconnect_all()send_signal
await sdk.send_signal(terminal_id: str, signal: Signal) -> None向指定终端发送信号。
参数:
terminal_id(str) - 目标终端IDsignal(Signal) - 要发送的信号对象
异常:
Exception- 发送失败时抛出
示例:
signal = Signal("sensor.data")
signal.get_payload().set_number("temperature", 25.5)
await sdk.send_signal("terminal-1", signal)send_audio_data
await sdk.send_audio_data(terminal_id: str, audio_data: bytes) -> None向指定终端发送 OPUS 格式的音频数据。
参数:
terminal_id(str) - 目标终端IDaudio_data(bytes) - OPUS 格式的音频二进制数据
异常:
Exception- 发送失败时抛出
示例:
with open("audio.opus", "rb") as f:
audio_data = f.read()
await sdk.send_audio_data("terminal-1", audio_data)set_directive_callback
sdk.set_directive_callback(terminal_id: str, callback: Callable[[Directive], None]) -> None设置接收指令的回调函数。
参数:
terminal_id(str) - 终端IDcallback(Callable) - 回调函数,接收一个 Directive 参数
回调函数签名:
def callback(directive: Directive) -> None:
pass示例:
def on_directive(directive):
print(f"收到指令: {directive.name}")
payload = directive.get_payload()
# 处理指令...
sdk.set_directive_callback("terminal-1", on_directive)异步回调:
async def on_directive(directive):
# 异步处理...
await some_async_operation()
sdk.set_directive_callback("terminal-1", on_directive)set_connection_callback
sdk.set_connection_callback(
terminal_id: str,
callback: Callable[[str, str], None]
) -> None设置连接状态变化的回调函数。
参数:
terminal_id(str) - 终端IDcallback(Callable) - 回调函数,接收状态和消息两个参数
回调函数签名:
def callback(status: str, message: str) -> None:
pass状态值:
"connected"- 已连接"disconnected"- 已断开"reconnecting"- 重连中"error"- 错误
示例:
def on_connection(status, message):
if status == "connected":
print("连接成功")
elif status == "disconnected":
print(f"连接断开: {message}")
elif status == "reconnecting":
print(f"正在重连: {message}")
elif status == "error":
print(f"连接错误: {message}")
sdk.set_connection_callback("terminal-1", on_connection)完整示例
import asyncio
from streamind_sdk import SDK, Config, Signal
async def main():
# 创建SDK
sdk = SDK()
# 创建配置
config = Config(
device_id="device-001",
device_type="sensor",
endpoint="wss://your-platform.com/signals",
tenant_id="your-tenant-id",
product_id="your-product-id",
product_key="your-secret-key"
)
# 注册终端
sdk.register_terminal("terminal-1", config)
# 设置回调
def on_directive(directive):
print(f"指令: {directive.name}")
def on_connection(status, message):
print(f"状态: {status}, {message}")
sdk.set_directive_callback("terminal-1", on_directive)
sdk.set_connection_callback("terminal-1", on_connection)
# 连接
await sdk.connect("terminal-1")
# 发送信号
signal = Signal("sensor.data")
signal.get_payload().set_number("value", 25.5)
await sdk.send_signal("terminal-1", signal)
# 发送音频
with open("audio.opus", "rb") as f:
await sdk.send_audio_data("terminal-1", f.read())
# 保持连接
await asyncio.sleep(60)
# 断开连接
await sdk.disconnect("terminal-1")
asyncio.run(main())线程安全
SDK 类不是线程安全的。如果需要在多线程环境中使用,请为每个线程创建独立的 SDK 实例,或使用适当的同步机制。
最佳实践
- 单一 SDK 实例 - 在应用中使用单个 SDK 实例管理所有终端
- 错误处理 - 始终使用 try-except 包裹连接和发送操作
- 优雅关闭 - 应用退出前调用
disconnect_all() - 回调轻量 - 回调函数中避免阻塞操作,使用异步处理
- 资源管理 - 使用 asyncio 的上下文管理器管理连接生命周期
Last updated on