Skip to Content
文档StreamindSDK 集成Python SDK示例代码基础使用

基础使用

最基本的 StreamInd Python SDK 使用示例,展示连接、发送信号和接收指令。

完整示例

import asyncio from streamind_sdk import SDK, Config, Signal async def main(): # 1. 创建配置 config = Config( device_id="device-001", device_type="sensor", endpoint="wss://your-platform.com/signals", tenant_id="your-tenant-id", product_id="your-product-id", product_key="your-secret-key" ) # 2. 创建 SDK 实例 sdk = SDK() # 3. 注册终端 sdk.register_terminal("terminal-1", config) # 4. 设置指令回调 def on_directive(directive): print(f"\n收到指令: {directive.name}") payload = directive.get_payload() # 处理不同类型的指令 if directive.name == "device.control": action = payload.get_string("action") print(f"执行动作: {action}") # 5. 设置连接状态回调 def on_connection(status, message): print(f"连接状态: {status}") if message: print(f"消息: {message}") sdk.set_directive_callback("terminal-1", on_directive) sdk.set_connection_callback("terminal-1", on_connection) # 6. 连接到平台 print("正在连接...") await sdk.connect("terminal-1") print("已连接到 StreamInd 平台") # 7. 发送信号 signal = Signal("sensor.data") payload = signal.get_payload() payload.set_number("temperature", 25.5) payload.set_number("humidity", 60.0) payload.set_string("location", "lab_01") await sdk.send_signal("terminal-1", signal) print("信号已发送") # 8. 保持连接一段时间 print("保持连接 60 秒...") await asyncio.sleep(60) # 9. 断开连接 print("正在断开连接...") await sdk.disconnect("terminal-1") print("已断开连接") # 运行程序 if __name__ == "__main__": asyncio.run(main())

运行示例

  1. 保存代码为 basic_usage.py
  2. 替换配置信息(tenant_id, product_id, product_key)
  3. 运行:
python basic_usage.py

预期输出

正在连接... 连接状态: connected 消息: Connection established 已连接到 StreamInd 平台 信号已发送 保持连接 60 秒... 收到指令: device.control 执行动作: check_status 正在断开连接... 连接状态: disconnected 消息: Connection closed 已断开连接

代码说明

配置创建

config = Config( device_id="device-001", # 设备唯一 ID device_type="sensor", # 设备类型 endpoint="wss://...", # WebSocket 端点 tenant_id="...", # 租户 ID product_id="...", # 产品 ID product_key="..." # 产品密钥 )

回调设置

# 指令回调:接收平台下发的指令 def on_directive(directive): print(f"收到指令: {directive.name}") # 连接状态回调:监控连接状态变化 def on_connection(status, message): print(f"状态: {status}, 消息: {message}")

发送信号

signal = Signal("sensor.data") # 创建信号 payload = signal.get_payload() # 获取载荷 payload.set_number("temperature", 25.5) # 添加数据 await sdk.send_signal("terminal-1", signal) # 发送

简化版本

如果不需要回调,可以使用更简洁的版本:

import asyncio from streamind_sdk import SDK, Config, Signal async def main(): # 配置和注册 config = Config(...) sdk = SDK() sdk.register_terminal("terminal-1", config) # 连接 await sdk.connect("terminal-1") # 发送信号 signal = Signal("sensor.data") signal.get_payload().set_number("value", 25.5) await sdk.send_signal("terminal-1", signal) # 保持连接 await asyncio.sleep(60) # 断开 await sdk.disconnect("terminal-1") asyncio.run(main())

使用环境变量

安全管理认证信息:

import os import asyncio from dotenv import load_dotenv from streamind_sdk import SDK, Config, Signal # 加载 .env 文件 load_dotenv() async def main(): config = Config( device_id=os.getenv("DEVICE_ID", "device-001"), device_type=os.getenv("DEVICE_TYPE", "sensor"), endpoint=os.getenv("STREAMIND_ENDPOINT"), tenant_id=os.getenv("STREAMIND_TENANT_ID"), product_id=os.getenv("STREAMIND_PRODUCT_ID"), product_key=os.getenv("STREAMIND_PRODUCT_KEY") ) sdk = SDK() sdk.register_terminal("terminal-1", config) await sdk.connect("terminal-1") # ... 发送信号等操作 ... await sdk.disconnect("terminal-1") asyncio.run(main())

创建 .env 文件

DEVICE_ID=sensor-001 DEVICE_TYPE=temperature_sensor STREAMIND_ENDPOINT=wss://api.streamind.com/signals STREAMIND_TENANT_ID=your-tenant-id STREAMIND_PRODUCT_ID=your-product-id STREAMIND_PRODUCT_KEY=your-secret-key

定时发送数据

每隔一段时间发送传感器数据:

import asyncio from streamind_sdk import SDK, Config, Signal async def send_periodic_data(sdk, terminal_id, interval=10): """定时发送传感器数据""" counter = 0 while True: # 创建信号 signal = Signal("sensor.data") payload = signal.get_payload() payload.set_number("temperature", 20.0 + counter % 10) payload.set_number("counter", counter) # 发送 try: await sdk.send_signal(terminal_id, signal) print(f"[{counter}] 数据已发送") except Exception as e: print(f"发送失败: {e}") counter += 1 await asyncio.sleep(interval) async def main(): config = Config(...) sdk = SDK() sdk.register_terminal("terminal-1", config) await sdk.connect("terminal-1") print("已连接,开始定时发送数据...") # 定时发送数据 await send_periodic_data(sdk, "terminal-1", interval=5) asyncio.run(main())

下一步

Last updated on