基础使用
最基本的 StreamInd Python SDK 使用示例,展示连接、发送信号和接收指令。
完整示例
import asyncio
from streamind_sdk import SDK, Config, Signal
async def main():
# 1. 创建配置
config = Config(
device_id="device-001",
device_type="sensor",
endpoint="wss://your-platform.com/signals",
tenant_id="your-tenant-id",
product_id="your-product-id",
product_key="your-secret-key"
)
# 2. 创建 SDK 实例
sdk = SDK()
# 3. 注册终端
sdk.register_terminal("terminal-1", config)
# 4. 设置指令回调
def on_directive(directive):
print(f"\n收到指令: {directive.name}")
payload = directive.get_payload()
# 处理不同类型的指令
if directive.name == "device.control":
action = payload.get_string("action")
print(f"执行动作: {action}")
# 5. 设置连接状态回调
def on_connection(status, message):
print(f"连接状态: {status}")
if message:
print(f"消息: {message}")
sdk.set_directive_callback("terminal-1", on_directive)
sdk.set_connection_callback("terminal-1", on_connection)
# 6. 连接到平台
print("正在连接...")
await sdk.connect("terminal-1")
print("已连接到 StreamInd 平台")
# 7. 发送信号
signal = Signal("sensor.data")
payload = signal.get_payload()
payload.set_number("temperature", 25.5)
payload.set_number("humidity", 60.0)
payload.set_string("location", "lab_01")
await sdk.send_signal("terminal-1", signal)
print("信号已发送")
# 8. 保持连接一段时间
print("保持连接 60 秒...")
await asyncio.sleep(60)
# 9. 断开连接
print("正在断开连接...")
await sdk.disconnect("terminal-1")
print("已断开连接")
# 运行程序
if __name__ == "__main__":
asyncio.run(main())运行示例
- 保存代码为
basic_usage.py - 替换配置信息(tenant_id, product_id, product_key)
- 运行:
python basic_usage.py预期输出
正在连接...
连接状态: connected
消息: Connection established
已连接到 StreamInd 平台
信号已发送
保持连接 60 秒...
收到指令: device.control
执行动作: check_status
正在断开连接...
连接状态: disconnected
消息: Connection closed
已断开连接代码说明
配置创建
config = Config(
device_id="device-001", # 设备唯一 ID
device_type="sensor", # 设备类型
endpoint="wss://...", # WebSocket 端点
tenant_id="...", # 租户 ID
product_id="...", # 产品 ID
product_key="..." # 产品密钥
)回调设置
# 指令回调:接收平台下发的指令
def on_directive(directive):
print(f"收到指令: {directive.name}")
# 连接状态回调:监控连接状态变化
def on_connection(status, message):
print(f"状态: {status}, 消息: {message}")发送信号
signal = Signal("sensor.data") # 创建信号
payload = signal.get_payload() # 获取载荷
payload.set_number("temperature", 25.5) # 添加数据
await sdk.send_signal("terminal-1", signal) # 发送简化版本
如果不需要回调,可以使用更简洁的版本:
import asyncio
from streamind_sdk import SDK, Config, Signal
async def main():
# 配置和注册
config = Config(...)
sdk = SDK()
sdk.register_terminal("terminal-1", config)
# 连接
await sdk.connect("terminal-1")
# 发送信号
signal = Signal("sensor.data")
signal.get_payload().set_number("value", 25.5)
await sdk.send_signal("terminal-1", signal)
# 保持连接
await asyncio.sleep(60)
# 断开
await sdk.disconnect("terminal-1")
asyncio.run(main())使用环境变量
安全管理认证信息:
import os
import asyncio
from dotenv import load_dotenv
from streamind_sdk import SDK, Config, Signal
# 加载 .env 文件
load_dotenv()
async def main():
config = Config(
device_id=os.getenv("DEVICE_ID", "device-001"),
device_type=os.getenv("DEVICE_TYPE", "sensor"),
endpoint=os.getenv("STREAMIND_ENDPOINT"),
tenant_id=os.getenv("STREAMIND_TENANT_ID"),
product_id=os.getenv("STREAMIND_PRODUCT_ID"),
product_key=os.getenv("STREAMIND_PRODUCT_KEY")
)
sdk = SDK()
sdk.register_terminal("terminal-1", config)
await sdk.connect("terminal-1")
# ... 发送信号等操作 ...
await sdk.disconnect("terminal-1")
asyncio.run(main())创建 .env 文件:
DEVICE_ID=sensor-001
DEVICE_TYPE=temperature_sensor
STREAMIND_ENDPOINT=wss://api.streamind.com/signals
STREAMIND_TENANT_ID=your-tenant-id
STREAMIND_PRODUCT_ID=your-product-id
STREAMIND_PRODUCT_KEY=your-secret-key定时发送数据
每隔一段时间发送传感器数据:
import asyncio
from streamind_sdk import SDK, Config, Signal
async def send_periodic_data(sdk, terminal_id, interval=10):
"""定时发送传感器数据"""
counter = 0
while True:
# 创建信号
signal = Signal("sensor.data")
payload = signal.get_payload()
payload.set_number("temperature", 20.0 + counter % 10)
payload.set_number("counter", counter)
# 发送
try:
await sdk.send_signal(terminal_id, signal)
print(f"[{counter}] 数据已发送")
except Exception as e:
print(f"发送失败: {e}")
counter += 1
await asyncio.sleep(interval)
async def main():
config = Config(...)
sdk = SDK()
sdk.register_terminal("terminal-1", config)
await sdk.connect("terminal-1")
print("已连接,开始定时发送数据...")
# 定时发送数据
await send_periodic_data(sdk, "terminal-1", interval=5)
asyncio.run(main())下一步
Last updated on