音频流传输
使用 StreamInd Python SDK 发送 OPUS 格式的音频流数据。
发送音频文件
完整示例
import asyncio
from streamind_sdk import SDK, Config
async def send_audio_file(sdk, terminal_id, audio_file_path):
"""读取并发送音频文件"""
try:
# 读取 OPUS 音频文件
with open(audio_file_path, "rb") as f:
audio_data = f.read()
print(f"读取音频文件: {audio_file_path}")
print(f"文件大小: {len(audio_data)} 字节")
# 发送音频数据
await sdk.send_audio_data(terminal_id, audio_data)
print("音频发送成功")
except FileNotFoundError:
print(f"错误:文件不存在 - {audio_file_path}")
except Exception as e:
print(f"发送音频失败: {e}")
async def main():
# 创建配置
config = Config(
device_id="audio-device-001",
device_type="audio_client",
endpoint="wss://your-platform.com/signals",
tenant_id="your-tenant-id",
product_id="your-product-id",
product_key="your-secret-key"
)
# 创建 SDK
sdk = SDK()
sdk.register_terminal("terminal-1", config)
# 连接
await sdk.connect("terminal-1")
print("已连接到平台")
# 发送音频文件
await send_audio_file(sdk, "terminal-1", "audio.opus")
# 保持连接
await asyncio.sleep(5)
# 断开连接
await sdk.disconnect("terminal-1")
asyncio.run(main())流式音频发送
分块发送大音频文件:
import asyncio
from streamind_sdk import SDK, Config, Signal
async def send_audio_stream(sdk, terminal_id, audio_file_path, chunk_size=4096):
"""流式发送音频数据"""
try:
with open(audio_file_path, "rb") as f:
chunk_index = 0
while True:
# 读取音频块
chunk = f.read(chunk_size)
if not chunk:
break
# 发送音频块
await sdk.send_audio_data(terminal_id, chunk)
chunk_index += 1
print(f"发送音频块 {chunk_index}: {len(chunk)} 字节")
# 控制发送速率(可选)
await asyncio.sleep(0.1)
print(f"音频流发送完成,共 {chunk_index} 块")
except Exception as e:
print(f"流式发送失败: {e}")
async def main():
config = Config(...)
sdk = SDK()
sdk.register_terminal("terminal-1", config)
await sdk.connect("terminal-1")
# 流式发送
await send_audio_stream(sdk, "terminal-1", "large_audio.opus", chunk_size=8192)
await sdk.disconnect("terminal-1")
asyncio.run(main())麦克风实时采集
使用 PyAudio 实时采集并发送音频:
import asyncio
import pyaudio
from streamind_sdk import SDK, Config
# 音频配置
RATE = 16000 # 采样率
CHANNELS = 1 # 单声道
CHUNK_SIZE = 1024 # 每次读取的帧数
async def stream_microphone_audio(sdk, terminal_id, duration=10):
"""实时采集麦克风音频并发送"""
p = pyaudio.PyAudio()
# 打开音频流
stream = p.open(
format=pyaudio.paInt16,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK_SIZE
)
print(f"开始录音 {duration} 秒...")
try:
start_time = asyncio.get_event_loop().time()
while True:
# 检查是否超时
if asyncio.get_event_loop().time() - start_time > duration:
break
# 读取音频数据
audio_chunk = stream.read(CHUNK_SIZE, exception_on_overflow=False)
# 发送到平台(这里需要先编码为 OPUS)
# 注意:实际使用中需要使用 opuslib 等库进行编码
await sdk.send_audio_data(terminal_id, audio_chunk)
await asyncio.sleep(0.01)
finally:
# 清理资源
stream.stop_stream()
stream.close()
p.terminate()
print("录音结束")
async def main():
config = Config(...)
sdk = SDK()
sdk.register_terminal("terminal-1", config)
await sdk.connect("terminal-1")
print("已连接,准备录音...")
# 录音并发送 10 秒
await stream_microphone_audio(sdk, "terminal-1", duration=10)
await sdk.disconnect("terminal-1")
asyncio.run(main())OPUS 编码
使用 opuslib 编码音频数据:
import asyncio
import pyaudio
import opuslib
from streamind_sdk import SDK, Config
# 音频参数
SAMPLE_RATE = 16000
CHANNELS = 1
FRAME_SIZE = 960 # 60ms @ 16kHz
async def stream_with_opus_encoding(sdk, terminal_id, duration=10):
"""录音、OPUS 编码并发送"""
# 初始化 PyAudio
p = pyaudio.PyAudio()
stream = p.open(
format=pyaudio.paInt16,
channels=CHANNELS,
rate=SAMPLE_RATE,
input=True,
frames_per_buffer=FRAME_SIZE
)
# 初始化 OPUS 编码器
encoder = opuslib.Encoder(SAMPLE_RATE, CHANNELS, opuslib.APPLICATION_VOIP)
print(f"开始录音并编码 {duration} 秒...")
try:
start_time = asyncio.get_event_loop().time()
while True:
if asyncio.get_event_loop().time() - start_time > duration:
break
# 读取 PCM 数据
pcm_data = stream.read(FRAME_SIZE, exception_on_overflow=False)
# OPUS 编码
opus_data = encoder.encode(pcm_data, FRAME_SIZE)
# 发送编码后的数据
await sdk.send_audio_data(terminal_id, opus_data)
await asyncio.sleep(0.01)
finally:
stream.stop_stream()
stream.close()
p.terminate()
print("录音结束")
async def main():
config = Config(...)
sdk = SDK()
sdk.register_terminal("terminal-1", config)
await sdk.connect("terminal-1")
# 录音、编码并发送
await stream_with_opus_encoding(sdk, "terminal-1", duration=10)
await sdk.disconnect("terminal-1")
asyncio.run(main())接收音频指令
接收平台发送的音频控制指令:
import asyncio
from streamind_sdk import SDK, Config
async def main():
config = Config(...)
sdk = SDK()
sdk.register_terminal("terminal-1", config)
# 音频状态
is_recording = False
# 指令回调
def on_directive(directive):
nonlocal is_recording
if directive.name == "audio.control":
payload = directive.get_payload()
action = payload.get_string("action")
if action == "start_recording":
print("开始录音")
is_recording = True
elif action == "stop_recording":
print("停止录音")
is_recording = False
sdk.set_directive_callback("terminal-1", on_directive)
await sdk.connect("terminal-1")
print("等待音频控制指令...")
# 保持连接
await asyncio.sleep(3600)
await sdk.disconnect("terminal-1")
asyncio.run(main())音频文件转换
将其他格式转换为 OPUS:
from pydub import AudioSegment
import io
def convert_to_opus(input_file, output_file=None):
"""将音频文件转换为 OPUS 格式"""
# 读取音频文件
audio = AudioSegment.from_file(input_file)
# 转换为单声道 16kHz
audio = audio.set_channels(1)
audio = audio.set_frame_rate(16000)
# 导出为 OPUS
if output_file:
audio.export(output_file, format="opus", codec="libopus")
return output_file
else:
# 导出到内存
buffer = io.BytesIO()
audio.export(buffer, format="opus", codec="libopus")
return buffer.getvalue()
# 使用
opus_data = convert_to_opus("input.mp3", "output.opus")
print(f"转换完成: {opus_data}")完整的录音应用
结合信号和音频的完整示例:
import asyncio
from streamind_sdk import SDK, Config, Signal
class AudioRecorder:
def __init__(self, sdk, terminal_id):
self.sdk = sdk
self.terminal_id = terminal_id
self.is_recording = False
async def start_recording(self):
"""开始录音"""
self.is_recording = True
# 发送录音开始信号
signal = Signal("audio.status")
signal.get_payload().set_string("status", "recording")
await self.sdk.send_signal(self.terminal_id, signal)
print("录音已开始")
async def stop_recording(self):
"""停止录音"""
self.is_recording = False
# 发送录音结束信号
signal = Signal("audio.status")
signal.get_payload().set_string("status", "stopped")
await self.sdk.send_signal(self.terminal_id, signal)
print("录音已停止")
async def send_audio_chunk(self, audio_data):
"""发送音频块"""
if self.is_recording:
await self.sdk.send_audio_data(self.terminal_id, audio_data)
async def main():
config = Config(...)
sdk = SDK()
sdk.register_terminal("terminal-1", config)
recorder = AudioRecorder(sdk, "terminal-1")
# 指令回调
async def on_directive(directive):
if directive.name == "audio.control":
payload = directive.get_payload()
action = payload.get_string("action")
if action == "start":
await recorder.start_recording()
elif action == "stop":
await recorder.stop_recording()
sdk.set_directive_callback("terminal-1", on_directive)
await sdk.connect("terminal-1")
print("音频系统已就绪")
# 保持运行
await asyncio.sleep(3600)
await sdk.disconnect("terminal-1")
asyncio.run(main())依赖安装
# 基础音频处理
pip install pyaudio
# OPUS 编码
pip install opuslib
# 音频格式转换
pip install pydub注意事项
- 音频格式:StreamInd 平台要求 OPUS 格式
- 采样率:推荐使用 16kHz
- 声道:推荐使用单声道
- 分块大小:根据网络情况调整,一般 4KB-8KB
- 错误处理:音频发送失败时应有重试机制
下一步
Last updated on