构建在 Apache Flink 之上的事件驱动流式 Agentic AI 框架,支持毫秒级实时处理、Exactly-Once 行动一致性,以及与 Flink DataStream/Table API 的无缝集成。
Apache Flink Agents 是 Apache Flink 官方子项目,定位为事件驱动的流式 Agentic AI 框架。它解决了传统 AI 智能体框架在实时流处理场景中的局限性,提供毫秒级响应、大规模分布式执行以及 Exactly-Once 行动一致性保证。
核心能力#
- 流处理引擎:基于 Flink 分布式流引擎的实时事件处理,支持大规模毫秒级响应
- 智能体模式:Workflow Agent(DAG 编排)与 ReAct Agent(推理-行动循环)两种范式
- 一致性保证:通过 checkpoint + 外部 WAL 实现 Exactly-Once 行动一致性
- 多语言支持:原生 Python 与 Java API
- DataStream 集成:
from_datastream/to_datastream与 Flink DataStream 双向互转
AI 生态集成#
- LLM 提供商:Anthropic、AzureAI、Ollama、OpenAI
- 向量存储:Elasticsearch
- MCP 协议:Model Context Protocol 工具集成
- 记忆机制:感官记忆、短期记忆、长期记忆
典型应用场景#
- 实时评论/反馈分析:从流式数据中提取情感、评分、问题点
- 实时监控与告警:基于事件流的智能异常检测与响应
- 多智能体协作工作流:在 Flink 生态内编排多 Agent DAG
- 需要强一致性的智能体行动:金融交易、库存管理等对副作用敏感的场景
运行环境要求#
| 依赖 | 版本要求 |
|---|---|
| Flink | ≥ 1.20.3 |
| Java | 11+(21+ 获完整功能) |
| Python | 3.10 或 3.11 |
开发示例#
# 创建执行环境并注册资源
env = AgentsExecutionEnvironment.get_execution_environment()
env.register_chat_model("my_llm", chat_model_setup)
# 定义 Agent
@agent(prompt="...", tools=[...], chat_model_setup="my_llm")
class MyAgent:
@action(input_event=InputEvent, output_event=OutputEvent)
def process(self, event):
return OutputEvent(...)
# 与 DataStream 集成
stream = env.from_datastream(input_stream)
result = stream.apply(MyAgent)
result.to_datastream()
可观测性#
以事件为中心的编排方式,所有智能体动作通过事件串联与控制,便于通过事件日志理解智能体行为,支持 Monitoring 监控与审计。
版本状态#
当前为 v0.3-SNAPSHOT 阶段,活跃开发中。Apache-2.0 许可证。