

文|祝融
编辑|郭嘉

-
结构化数据载荷
class ProcessorPart:
content: ProcessorContent # 实际数据载荷
metadata: Dict[str, Any] # 元数据字典
mime_type: str # MIME类型标识
timestamp: float # 时间戳
sequence_id: str # 序列标识符
-
异步流处理能力库提供了用于分割、连接和合并ProcessorParts异步流的实用工具。这意味着数据可以在不阻塞主线程的情况下连续处理:
async def process_stream(input_stream: AsyncIterator[ProcessorPart]) -> AsyncIterator[ProcessorPart]:
async for part in input_stream:
# 处理每个部分
processed_part = await transform_part(part)
yield processed_part
-
双向流控制与传统的单向数据流不同,GenAI Processors支持双向流控制,允许下游处理器向上游发送反馈信息
class BidirectionalProcessor:
async def process(self, input_stream, feedback_stream):
# 同时处理输入和反馈
async for input_part, feedback_part in zip(input_stream, feedback_stream):
result = await self.handle_with_feedback(input_part, feedback_part)
yield result

class Processor(ABC):
@abstractmethod
async def process(self, input_stream: AsyncIterator[ProcessorPart]) -> AsyncIterator[ProcessorPart]:
pass
def __call__(self, input_stream):
return self.process(input_stream)
# 处理链组合
audio_processor = AudioTranscriber()
text_processor = TextAnalyzer()
response_generator = ResponseGenerator()
# 链式处理
async def process_audio_input(audio_stream):
transcribed = audio_processor(audio_stream)
analyzed = text_processor(transcribed)
responses = response_generator(analyzed)
return responses


-
AudioProcessor: 处理音频数据的专用处理器 -
TextProcessor: 文本处理和分析 -
ImageProcessor: 图像和视频帧处理 -
ModelProcessor: 与AI模型交互的处理器 -
StreamSplitter: 将单一流分割为多个并行流 -
StreamMerger: 合并多个流为单一输出 -
FilterProcessor: 基于条件过滤数据 -
TransformProcessor: 数据格式转换
1. 同步文本处理
from genai_processors.models import GeminiTextProcessor
text_processor = GeminiTextProcessor(
model_name="gemini-pro",
api_key="your-api-key",
temperature=0.7,
max_tokens=1000
)
async def process_text_query(query: str):
input_part = ProcessorPart(
content=TextContent(query),
metadata={"user_id": "123", "session_id": "abc"}
)
async for response_part in text_processor(async_iter([input_part])):
return response_part.content.text
2. Live API流式处理
from genai_processors.models import GeminiLiveProcessor
live_processor = GeminiLiveProcessor(
model_name="gemini-live",
api_key="your-api-key",
streaming=True,
real_time_factor=1.0
)
async def handle_live_audio(audio_stream):
async for audio_chunk in audio_stream:
input_part = ProcessorPart(
content=AudioContent(audio_chunk),
metadata={"format": "wav", "sample_rate": 16000}
)
async for response in live_processor(async_iter([input_part])):
if response.content.type == "audio":
yield response.content.audio_data
elif response.content.type == "text":
print(f"Transcription: {response.content.text}")
class AsyncModelProcessor:
async def process_batch(self, inputs: List[ProcessorPart]):
# 并发处理多个输入
tasks = [self.process_single(input_part) for input_part in inputs]
results = await asyncio.gather(*tasks)
return results
async def process_single(self, input_part: ProcessorPart):
# 异步API调用
async with aiohttp.ClientSession() as session:
response = await session.post(self.api_endpoint, json=input_part.to_dict())
return ProcessorPart.from_response(await response.json())
+
运算符组合输入源和处理步骤,从而创建清晰的数据流from genai_processors.core import audio_io, live_model, video
# Input processor: combines camera streams and audio streams
input_processor = video.VideoIn() + audio_io.PyAudioIn(...)
# Output processor: plays the audio parts. Handles interruptions and pauses
# audio output when the user is speaking.
play_output = audio_io.PyAudioOut(...)
# Gemini Live API processor
live_processor = live_model.LiveProcessor(...)
# Compose the agent: mic+camera -> Gemini Live API -> play audio
live_processor = live_model.LiveProcessor(...)
live_agent = input_processor + live_processor + play_output
async for part in live_agent(streams.endless_stream()):
# Process the output parts (e.g., print transcription, model output, metadata)
print(part)

class BatchProcessor:
def __init__(self, batch_size: int = 32, max_concurrency: int = 10):
self.batch_size = batch_size
self.semaphore = asyncio.Semaphore(max_concurrency)
async def process_batch(self, input_stream):
batch = []
async for item in input_stream:
batch.append(item)
if len(batch) >= self.batch_size:
async with self.semaphore:
results = await self.process_batch_items(batch)
for result in results:
yield result
batch = []
if batch:
async with self.semaphore:
results = await self.process_batch_items(batch)
for result in results:
yield result
class CustomAudioProcessor(Processor):
def __init__(self, model_path: str, config: Dict[str, Any]):
self.model = load_model(model_path)
self.config = config
async def process(self, input_stream: AsyncIterator[ProcessorPart]) -> AsyncIterator[ProcessorPart]:
async for part in input_stream:
# 验证输入类型
if not isinstance(part.content, AudioContent):
raise ValueError(f"Expected AudioContent, got {type(part.content)}")
audio_data = await self.preprocess_audio(part.content.audio_data)
result = await self.model.predict(audio_data)
# 创建输出ProcessorPart
output_part = ProcessorPart(
content=TextContent(result.transcription),
metadata={
**part.metadata,
'confidence': result.confidence,
'processing_time': result.processing_time
}
)
yield output_part
async def preprocess_audio(self, audio_data: bytes) -> np.ndarray:
# 音频预处理逻辑
audio_array = np.frombuffer(audio_data, dtype=np.int16)
# 标准化
audio_array = audio_array.astype(np.float32) / 32768.0
# 重采样到目标频率
if self.config.get('target_sample_rate'):
audio_array = resample(audio_array, self.config['target_sample_rate'])
return audio_array
PartProcessor的高级用法
对于需要更细粒度控制的场景,可以使用PartProcessor:
class AdvancedPartProcessor(PartProcessor):
async def process_part(self, part: ProcessorPart) -> AsyncIterator[ProcessorPart]:
# 检查是否需要分割大型数据
if part.content.size > self.max_chunk_size:
# 分割为较小的块
chunks = await self.split_content(part.content)
for i, chunk in enumerate(chunks):
chunk_part = ProcessorPart(
content=chunk,
metadata={
**part.metadata,
'chunk_index': i,
'total_chunks': len(chunks)
}
)
processed_chunk = await self.process_chunk(chunk_part)
yield processed_chunk
else:
# 直接处理小数据
result = await self.process_single_part(part)
yield result
-
LLM背后的基础模型 -
如何优雅的谈论大模型 -
体系化的通识大模型