Skip to Content

Directive 指令

Directive 类表示从 StreamInd 平台接收到的指令。平台通过指令向设备下发控制命令或请求。

接收指令

通过回调函数接收指令:

def on_directive(directive: Directive): print(f"收到指令: {directive.name}") sdk.set_directive_callback("terminal-1", on_directive)

属性

name

directive.name # 只读

获取指令的名称。

类型str

示例

def on_directive(directive): print(f"指令名称: {directive.name}") # 输出: 指令名称: device.control

方法

get_payload

payload = directive.get_payload() -> Payload

获取指令的载荷对象,用于读取指令参数。

返回

  • Payload - 载荷对象

示例

def on_directive(directive): payload = directive.get_payload() action = payload.get_string("action") value = payload.get_number("value")

完整示例

处理设备控制指令

from streamind_sdk import SDK, Config, Signal async def main(): sdk = SDK() config = Config(...) sdk.register_terminal("terminal-1", config) # 设置指令回调 def on_directive(directive): print(f"\n收到指令: {directive.name}") if directive.name == "device.control": handle_control(directive) elif directive.name == "device.query": handle_query(directive) else: print(f"未知指令类型: {directive.name}") def handle_control(directive): payload = directive.get_payload() action = payload.get_string("action") value = payload.get_number("value") print(f"执行动作: {action}, 参数: {value}") # 执行控制逻辑 if action == "set_temperature": set_temperature(value) elif action == "set_power": set_power(value > 0) def handle_query(directive): payload = directive.get_payload() query_type = payload.get_string("type") print(f"查询类型: {query_type}") # 响应查询 if query_type == "status": send_status_signal(sdk) sdk.set_directive_callback("terminal-1", on_directive) await sdk.connect("terminal-1") await asyncio.sleep(3600) # 保持运行 asyncio.run(main())

异步处理指令

import asyncio async def on_directive(directive): """异步回调函数""" print(f"收到指令: {directive.name}") payload = directive.get_payload() action = payload.get_string("action") # 执行异步操作 if action == "capture_image": await capture_and_upload_image() elif action == "run_diagnosis": result = await run_diagnosis() await send_result(result) sdk.set_directive_callback("terminal-1", on_directive)

指令响应确认

async def on_directive(directive): """处理指令并发送确认""" print(f"收到指令: {directive.name}") try: # 执行指令 payload = directive.get_payload() action = payload.get_string("action") await execute_action(action) # 发送成功确认 ack_signal = Signal("directive.ack") ack_payload = ack_signal.get_payload() ack_payload.set_string("directive_name", directive.name) ack_payload.set_boolean("success", True) await sdk.send_signal("terminal-1", ack_signal) except Exception as e: # 发送失败确认 ack_signal = Signal("directive.ack") ack_payload = ack_signal.get_payload() ack_payload.set_string("directive_name", directive.name) ack_payload.set_boolean("success", False) ack_payload.set_string("error", str(e)) await sdk.send_signal("terminal-1", ack_signal) sdk.set_directive_callback("terminal-1", on_directive)

常见指令类型

设备控制

def on_directive(directive): if directive.name == "device.control": payload = directive.get_payload() command = payload.get_string("command") if command == "power_on": device.power_on() elif command == "power_off": device.power_off() elif command == "restart": device.restart()

参数配置

def on_directive(directive): if directive.name == "device.config": payload = directive.get_payload() # 读取配置参数 interval = payload.get_number("report_interval") threshold = payload.get_number("threshold") enabled = payload.get_boolean("enabled") # 更新设备配置 device.update_config( report_interval=interval, threshold=threshold, enabled=enabled )

数据查询

async def on_directive(directive): if directive.name == "device.query": payload = directive.get_payload() query_type = payload.get_string("type") # 根据查询类型返回数据 if query_type == "status": status_signal = Signal("device.status") status_signal.get_payload().set_string("state", "running") await sdk.send_signal("terminal-1", status_signal) elif query_type == "metrics": metrics_signal = Signal("device.metrics") metrics_signal.get_payload().set_number("cpu", 45.2) metrics_signal.get_payload().set_number("memory", 68.5) await sdk.send_signal("terminal-1", metrics_signal)

指令路由

使用字典实现指令路由:

# 定义指令处理器 async def handle_control(directive): payload = directive.get_payload() # 处理控制指令... async def handle_config(directive): payload = directive.get_payload() # 处理配置指令... async def handle_query(directive): payload = directive.get_payload() # 处理查询指令... # 指令路由表 DIRECTIVE_HANDLERS = { "device.control": handle_control, "device.config": handle_config, "device.query": handle_query } # 统一回调 async def on_directive(directive): handler = DIRECTIVE_HANDLERS.get(directive.name) if handler: await handler(directive) else: print(f"未知指令: {directive.name}") sdk.set_directive_callback("terminal-1", on_directive)

错误处理

async def on_directive(directive): try: print(f"处理指令: {directive.name}") payload = directive.get_payload() # 参数验证 if not payload.has_key("action"): raise ValueError("缺少 action 参数") action = payload.get_string("action") # 执行操作 await execute_action(action) except ValueError as e: print(f"参数错误: {e}") except Exception as e: print(f"处理指令失败: {e}") sdk.set_directive_callback("terminal-1", on_directive)

最佳实践

  1. 非阻塞处理 - 使用异步回调避免阻塞
  2. 错误处理 - 捕获异常防止程序崩溃
  3. 确认机制 - 发送确认信号告知平台执行结果
  4. 参数验证 - 验证指令参数完整性和有效性
  5. 日志记录 - 记录接收和处理的指令

相关 API

Last updated on