An event-driven streaming Agentic AI framework built on Apache Flink, featuring millisecond-level real-time processing, exactly-once action consistency, and seamless integration with Flink DataStream/Table APIs.
Apache Flink Agents is an official Apache Flink subproject, positioned as an event-driven streaming Agentic AI framework. It addresses the limitations of traditional AI agent frameworks in real-time streaming scenarios, providing millisecond-level response, large-scale distributed execution, and exactly-once action consistency guarantees.
Core Capabilities#
- Streaming Engine: Real-time event processing powered by Flink's distributed streaming engine with millisecond-level response
- Agent Paradigms: Workflow Agent (DAG orchestration) and ReAct Agent (reasoning-action loop)
- Consistency Guarantee: Exactly-once action consistency via checkpoint + external WAL
- Multi-language Support: Native Python and Java APIs
- DataStream Integration: Bidirectional conversion with Flink DataStream via
from_datastream/to_datastream
AI Ecosystem Integration#
- LLM Providers: Anthropic, AzureAI, Ollama, OpenAI
- Vector Stores: Elasticsearch
- MCP Protocol: Model Context Protocol tool integration
- Memory Mechanisms: Sensory memory, short-term memory, long-term memory
Typical Use Cases#
- Real-time comment/feedback analysis: Extract sentiment, ratings, and pain points from streaming data
- Real-time monitoring and alerting: Intelligent anomaly detection and response based on event streams
- Multi-agent collaborative workflows: Orchestrate multi-Agent DAGs within Flink ecosystem
- Action-critical applications: Financial trading, inventory management requiring strong consistency
Runtime Requirements#
| Dependency | Version Requirement |
|---|---|
| Flink | ≥ 1.20.3 |
| Java | 11+ (21+ for full functionality) |
| Python | 3.10 or 3.11 |
Development Example#
# Create execution environment and register resources
env = AgentsExecutionEnvironment.get_execution_environment()
env.register_chat_model("my_llm", chat_model_setup)
# Define Agent
@agent(prompt="...", tools=[...], chat_model_setup="my_llm")
class MyAgent:
@action(input_event=InputEvent, output_event=OutputEvent)
def process(self, event):
return OutputEvent(...)
# Integrate with DataStream
stream = env.from_datastream(input_stream)
result = stream.apply(MyAgent)
result.to_datastream()
Observability#
Event-centric orchestration design where all agent actions are connected and controlled via events, enabling behavior understanding through event logs with monitoring and audit support.
Version Status#
Currently at v0.3-SNAPSHOT stage, under active development. Apache-2.0 licensed.