Directive 指令
Directive 类表示从 StreamInd 平台接收到的指令。平台通过指令向设备下发控制命令或请求。
接收指令
通过回调函数接收指令:
def on_directive(directive: Directive):
print(f"收到指令: {directive.name}")
sdk.set_directive_callback("terminal-1", on_directive)属性
name
directive.name # 只读获取指令的名称。
类型:str
示例:
def on_directive(directive):
print(f"指令名称: {directive.name}")
# 输出: 指令名称: device.control方法
get_payload
payload = directive.get_payload() -> Payload获取指令的载荷对象,用于读取指令参数。
返回:
Payload- 载荷对象
示例:
def on_directive(directive):
payload = directive.get_payload()
action = payload.get_string("action")
value = payload.get_number("value")完整示例
处理设备控制指令
from streamind_sdk import SDK, Config, Signal
async def main():
sdk = SDK()
config = Config(...)
sdk.register_terminal("terminal-1", config)
# 设置指令回调
def on_directive(directive):
print(f"\n收到指令: {directive.name}")
if directive.name == "device.control":
handle_control(directive)
elif directive.name == "device.query":
handle_query(directive)
else:
print(f"未知指令类型: {directive.name}")
def handle_control(directive):
payload = directive.get_payload()
action = payload.get_string("action")
value = payload.get_number("value")
print(f"执行动作: {action}, 参数: {value}")
# 执行控制逻辑
if action == "set_temperature":
set_temperature(value)
elif action == "set_power":
set_power(value > 0)
def handle_query(directive):
payload = directive.get_payload()
query_type = payload.get_string("type")
print(f"查询类型: {query_type}")
# 响应查询
if query_type == "status":
send_status_signal(sdk)
sdk.set_directive_callback("terminal-1", on_directive)
await sdk.connect("terminal-1")
await asyncio.sleep(3600) # 保持运行
asyncio.run(main())异步处理指令
import asyncio
async def on_directive(directive):
"""异步回调函数"""
print(f"收到指令: {directive.name}")
payload = directive.get_payload()
action = payload.get_string("action")
# 执行异步操作
if action == "capture_image":
await capture_and_upload_image()
elif action == "run_diagnosis":
result = await run_diagnosis()
await send_result(result)
sdk.set_directive_callback("terminal-1", on_directive)指令响应确认
async def on_directive(directive):
"""处理指令并发送确认"""
print(f"收到指令: {directive.name}")
try:
# 执行指令
payload = directive.get_payload()
action = payload.get_string("action")
await execute_action(action)
# 发送成功确认
ack_signal = Signal("directive.ack")
ack_payload = ack_signal.get_payload()
ack_payload.set_string("directive_name", directive.name)
ack_payload.set_boolean("success", True)
await sdk.send_signal("terminal-1", ack_signal)
except Exception as e:
# 发送失败确认
ack_signal = Signal("directive.ack")
ack_payload = ack_signal.get_payload()
ack_payload.set_string("directive_name", directive.name)
ack_payload.set_boolean("success", False)
ack_payload.set_string("error", str(e))
await sdk.send_signal("terminal-1", ack_signal)
sdk.set_directive_callback("terminal-1", on_directive)常见指令类型
设备控制
def on_directive(directive):
if directive.name == "device.control":
payload = directive.get_payload()
command = payload.get_string("command")
if command == "power_on":
device.power_on()
elif command == "power_off":
device.power_off()
elif command == "restart":
device.restart()参数配置
def on_directive(directive):
if directive.name == "device.config":
payload = directive.get_payload()
# 读取配置参数
interval = payload.get_number("report_interval")
threshold = payload.get_number("threshold")
enabled = payload.get_boolean("enabled")
# 更新设备配置
device.update_config(
report_interval=interval,
threshold=threshold,
enabled=enabled
)数据查询
async def on_directive(directive):
if directive.name == "device.query":
payload = directive.get_payload()
query_type = payload.get_string("type")
# 根据查询类型返回数据
if query_type == "status":
status_signal = Signal("device.status")
status_signal.get_payload().set_string("state", "running")
await sdk.send_signal("terminal-1", status_signal)
elif query_type == "metrics":
metrics_signal = Signal("device.metrics")
metrics_signal.get_payload().set_number("cpu", 45.2)
metrics_signal.get_payload().set_number("memory", 68.5)
await sdk.send_signal("terminal-1", metrics_signal)指令路由
使用字典实现指令路由:
# 定义指令处理器
async def handle_control(directive):
payload = directive.get_payload()
# 处理控制指令...
async def handle_config(directive):
payload = directive.get_payload()
# 处理配置指令...
async def handle_query(directive):
payload = directive.get_payload()
# 处理查询指令...
# 指令路由表
DIRECTIVE_HANDLERS = {
"device.control": handle_control,
"device.config": handle_config,
"device.query": handle_query
}
# 统一回调
async def on_directive(directive):
handler = DIRECTIVE_HANDLERS.get(directive.name)
if handler:
await handler(directive)
else:
print(f"未知指令: {directive.name}")
sdk.set_directive_callback("terminal-1", on_directive)错误处理
async def on_directive(directive):
try:
print(f"处理指令: {directive.name}")
payload = directive.get_payload()
# 参数验证
if not payload.has_key("action"):
raise ValueError("缺少 action 参数")
action = payload.get_string("action")
# 执行操作
await execute_action(action)
except ValueError as e:
print(f"参数错误: {e}")
except Exception as e:
print(f"处理指令失败: {e}")
sdk.set_directive_callback("terminal-1", on_directive)最佳实践
- 非阻塞处理 - 使用异步回调避免阻塞
- 错误处理 - 捕获异常防止程序崩溃
- 确认机制 - 发送确认信号告知平台执行结果
- 参数验证 - 验证指令参数完整性和有效性
- 日志记录 - 记录接收和处理的指令
相关 API
- Payload - 载荷数据操作
- SDK.set_directive_callback() - 设置指令回调
Last updated on