Flink Agents 源码解读 --- (5) --- ActionExecutionOperator

张开发
2026/4/17 1:57:42 15 分钟阅读

分享文章

Flink Agents 源码解读 --- (5) --- ActionExecutionOperator
0x00 摘要ActionExecutionOperator 是整个Flink Agent 系统的执行引擎它连接了 Flink 流处理框架和 Agent 逻辑协调各种组件完成了 Agent 定义的动作执行。ActionExecutionOperator 主要职责事件处理接收来自上游的数据包装成InputEvent动作执行根据Agent定义的动作规则触发相应的处理逻辑状态管理维护短期记忆检查点状态等异步支持处理需要异步执行的任务Python/Java交互操作协调组件间的交互输出产生将最终结果作为OutputEvent发送到下游这样整个Agent的逻辑就被集成到了标准的Flink流处理管道中能够利用Flink的分布式计算能力和容错机制。本篇主要是为了学习 ActionExecutionOperator 的架构生成机制和策略也会学习 ActionExecutionOperator 如何使用 AgentPlan如何生成 ActionTask。0x01 基础知识ActionExecutionOperator 算子是整个 Flink Agent 框架运行时的核心组件负责将用户定义的代理逻辑转换为实际的流处理操作。1.1 总体架构可以把 Flink Agents 的整个执行流程比作 “做一道菜”我们借此进行分析。Flink Agents 是基于原生 Flink 分布式流处理能力封装的上层框架。其中四个主要组件代表了 Flink Agents 框架中的四个层次Agent顶层设计定义了“做什么”用户定义的智能实体类似 “餐厅菜单 规则手册”包含业务逻辑、动作Action和资源工具、模型等定义明确 “做什么”。AgentPlan中间编译层确定了“怎么做”将 Agent 编译后的可执行计划类似 “详细操作流程图”明确动作触发规则、资源映射关系确定 “怎么做”。ActionExecutionOperator运行时执行层是执行环境负责“协调调度”Flink 集群中的执行核心在 Flink 流处理环境中实际执行操作类似 “餐厅首席大厨”负责接收数据、调度任务、管理状态协调整体执行流程。ActionTask最小执行单元负责“具体实施”具体的执行任务类似 “员工的单个服务步骤”分为 JavaActionTask 和 PythonActionTask处理单个事件并返回结果。具体可以参见下图。[Agent] 菜单手册 ↓编译 [AgentPlan] 详细流程图 ↓运行时实例化 [ActionExecutionOperator] 餐厅首席大厨 ↓分配任务 [ActionTask] 员工具体任务这样的设计使得系统既灵活又高效能够处理复杂的AI代理任务同时保证了良好的扩展性和维护性。1.2 定义ActionExecutionOperator 的定义如下。/** * An operator that executes the actions defined in the agent. Upon receiving data from the * upstream, it first wraps the data into an {link InputEvent}. It then invokes the corresponding * action that is interested in the {link InputEvent}, and collects the output event produced by * the action. * * pFor events of type {link OutputEvent}, the data contained in the event is sent downstream. * For all other event types, the process is repeated: the event triggers the corresponding action, * and the resulting output event is collected for further processing. */ public class ActionExecutionOperatorIN, OUT extends AbstractStreamOperatorOUT implements OneInputStreamOperatorIN, OUT, BoundedOneInput { private static final long serialVersionUID 1L; private static final Logger LOG LoggerFactory.getLogger(ActionExecutionOperator.class); private static final String RECOVERY_MARKER_STATE_NAME recoveryMarker; private static final String MESSAGE_SEQUENCE_NUMBER_STATE_NAME messageSequenceNumber; private static final String PENDING_INPUT_EVENT_STATE_NAME pendingInputEvents; private final AgentPlan agentPlan; private final Boolean inputIsJava; private transient StreamRecordOUT reusedStreamRecord; private transient MapStateString, MemoryObjectImpl.MemoryItem shortTermMemState; // PythonActionExecutor for Python actions private transient PythonActionExecutor pythonActionExecutor; private transient FlinkAgentsMetricGroupImpl metricGroup; private transient BuiltInMetrics builtInMetrics; private transient SegmentedQueue keySegmentQueue; private final transient MailboxExecutor mailboxExecutor; // We need to check whether the current thread is the mailbox thread using the mailbox // processor. // TODO: This is a temporary workaround. In the future, we should add an interface in // MailboxExecutor to check whether a thread is a mailbox thread, rather than using reflection // to obtain the MailboxProcessor instance and make the determination. private transient MailboxProcessor mailboxProcessor; // An action will be split into one or more ActionTask objects. We use a state to store the // pending ActionTasks that are waiting to be executed. private transient ListStateActionTask actionTasksKState; // To avoid processing different InputEvents with the same key, we use a state to store pending // InputEvents that are waiting to be processed. private transient ListStateEvent pendingInputEventsKState; // An operator state is used to track the currently processing keys. This is useful when // receiving an EndOfInput signal, as we need to wait until all related events are fully // processed. private transient ListStateObject currentProcessingKeysOpState; private final transient EventLogger eventLogger; private final transient ListEventListener eventListeners; private transient ActionStateStore actionStateStore; private transient ValueStateLong sequenceNumberKState; private transient ListStateObject recoveryMarkerOpState; private transient MapLong, MapObject, Long checkpointIdToSeqNums; // This in memory map keep track of the runner context for the async action task that having // been finished private final transient MapActionTask, RunnerContextImpl actionTaskRunnerContexts;1.3 关键设计ActionExecutionOperator 关键设计要点如下状态管理使用Flink的状态后端管理短期记忆和处理状态异步支持通过邮箱线程机制支持异步操作执行容错恢复通过checkpoint机制实现状态持久化和恢复资源隔离每个key拥有独立的处理上下文和状态多语言支持同时支持Java和Python实现的动作这种设计使得复杂的Agent逻辑能够在Flink的分布式流处理环境中高效、可靠地执行1.4 流程ActionExecutionOperator 的工作流程接收数据从上游接收数据并包装成 InputEvent动作触发依据事件类型查找并触发相应的动作动作执行执行动作逻辑可能产生新的事件事件处理处理动作产生的事件如果是输入事件则发送给下游状态更新更新内存状态和执行状态循环处理继续处理新产生的事件直到没有待处理的事件。关键特性事件驱动基于事件的处理模型事件触发动作执行状态一致性通过Flink的状态管理来保证处理一致性容错恢复支持从检查点恢复执行状态混合执行同时支持 java 和 python 动作的执行内存管理管理Agent的短期内存状态使用流程图如下graph TD A[AgentsExecutionEnvironment] -- B[RemoteExecutionEnvironment] B -- C[ActionExecutionOperator] C -- D[AgentPlan] C -- D[Flink State Backend] C -- D[Python Components] C -- D[Event Processing Components]1.5 组件 关系ActionExecutionOperator 是 Flink Agent 系统中的核心执行组件位于 Flink 流处理管道中负责执行 Agent 定义的各种动作。其主要交互组件如下上游组件DataStream API作为 Flink 流处理管道中的操作符接收来自上游 DataStream 的数据Input 数据源接收各种类型的输入数据将其包装为 InputEventKeyedStream处理按键分组的数据流下游组件Output 数据接收器将OutputEvent 中的数据发送到下游操作符DataStream API输出处理结果到下游 DataStream 操作符核心依赖组件工作流程中的交互graph TD A[DataStream Source] -- B[ActionExecutionOperator] B -- C[DataStream Sink] B -- D[AgentPlan] B -- E[ActionTask] B -- F[RunnerContext] B -- G[Flink State Backend] B -- H[PythonActionExecutor] B -- I[ActionStateStore]1.6 核心依赖组件AgentPlanAgentPlan会被 ActionExecutionOperator 使用因为操作符需要访问AgentPlan中的动作定义和资源配置来执行相应逻辑包含Agent的所有动作定义和资源配置提供动作触发逻辑和资源配置信息ActionTask 及其子类JavaActionTask 执行 Java 动作PythonActionTask 执行Python动作EnvironmentFlink Streaming Runtime EnvironmentActionExecutionOperator 是一个Flink 流处理操作符会被 Flink 流处理运行时环境使用作为流处理作业 DAG 图中的一个节点。RemoteExecutionEnvironment在远程执行环境中RemoteExecutionEnvironment 会创建和使用 ActionExecutionOperator 来执行Agent 逻辑。RunnerContext 及其实现RunnerContextImplJava 动作执行上下文PythonRunnerContextImplPython 动作执行上下文状态管理组件。以下状态存储组件会被 ActionExecutionOperator 使用。Flink 状态后端用于短期内存、待处理任务等。ActionStateStoreKafkaActionStateStore可选的外部状态存储用于动作状态持久化和恢复。Python相关组件。当代理包含Python动作时以下组件会与 ActionExecutionOperator 协同工作。PythonActionExecutor管理Python环境和执行Python动作PythonActionTask表示Python动作任务PythonRunnerContextImpl Python动作执行上下文。辅助组件EventLogger事件日志记录器EventListner事件监听器BultInMetrics内置指标收集FlinkAgentsMetricGroupImpl指标组实现事件处理相关组件各种Event类型InputEvent、OutputEvent、PythonEvent等SegmentedQueue管理按键分段的事件队列和水印处理1.7 主要功能ActionExecutionOperator 主要功能如下动作执行核心执行Agent中定义的所有动作actions处理事件驱动的执行流程根据事件类型触发的相应的动作事件处理流程接收上游数据并将其包装成 InputEvent根据事件类型触发对应的 action 执行对应 OutputEvent 类型的事件将数据发送到下游对于其他类型的事件继续触发相应的动作来执行状态管理管理短期内存状态short-term memory维护动作执行状态和序列号支持检查点和故障恢复使用 Flink 状态后端持久化关键状态信息混合语言支持支持 Java 和 Python 两种语言编写的动作通过 PythonActionExecutor 处理 Python 动作的执行在同一个算子中协调 Java 和 Python 动作的执行异步处理支持支持异步动作执行通过 MailboxExector 管理异步任务处理长时间运行的动作任务监控和日志集成指标收集和监控支持事件日志记录提供内置指标跟踪执行情况1.8 运行ActionTask流程ActionTask 在 Flink 中的运行机制分三步完成创建、调度、执行全程由 ActionExecutionOperator 托管并利用 Flink 的邮箱线程保证并发安全具体如下。创建与初始化作业启动时ActionExecutionOperator 完成实例化内部持有 AgentPlan含全部 Action 定义。上游数据到达 processElement() 后先被封装成 InputEvent随后调用 processEvent() 开始处理。ActionTask 的创建与调度processEvent() 通过 getActionsTriggeredBy() 找出响应该事件的所有 Action。对每个 Action调用 createActionTask() 生成对应的 ActionTask 实例并存入 actionTasksState 状态。使用 Flink MailboxExecutor 提交异步任务mailboxExecutor.submit(() - tryProcessActionTaskForKey(key), process action task);保证后续逻辑在 mailbox 线程中顺序执行避免并发冲突。这是Agent与Flink进行结合的关键之一是Agent代码运行的关键。ActionTask 的执行tryProcessActionTaskForKey() → processActionTaskForKey() 从 actionTasksState 取出待处理任务。createAndSetRunnerContext() 为任务绑定运行时上下文内存、指标、邮箱检查等。调用 ActionTask.invoke() 执行具体动作逻辑若任务返回新生成器异步场景则循环创建后续 ActionTask 并再次提交邮箱实现“拆分-继续”流程。ActionTask.invoke() 返回 ActionTaskResult若任务已完成isFinished() 为 true则通过 processEvent() 向下游发送输出事件或触发新的 ActionTask并持久化内存状态。若任务未完成异步操作则保存 RunnerContext 供后续使用并将新生成的 ActionTask 加入 actionTasksState再次提交到 MailboxExecutor形成循环直到所有任务完成。状态管理与容错使用 Flink 状态后端持久化 actionTasksState、pendingInputEventsState 等在 checkpoint 时保存快照故障恢复后可继续执行。实现 snapshotState() 和 initializeState() 方法完成状态的快照与恢复。整个流程依托 Flink 核心特性事件驱动基于事件触发动作执行状态管理使用 Flink 状态后端管理中间状态容错机制通过 checkpoint 和状态恢复保证 exactly-once 语义异步处理通过 MailboxExecutor 实现高效的异步任务调度键控处理支持按键分区处理保证同一键的数据顺序处理。通过“状态存储 邮箱调度”双机制ActionTask 既能顺序处理单键事件又能优雅拆分异步步骤全程与 Flink 的并发模型无缝衔接。1.8 Flink MailboxExecutorMailboxExecutor 是 Flink 内部的一个“单线程邮筒”调度器作用可以一句话概括“外部线程只管投信内部让同一个算子/同一个键的所有任务按顺序一条一条执行避免并发竞争同时不阻塞整个 TaskManager”。为什么需要 MailboxExecutorFlink 算子可能被多线程同时调用网络线程、异步 I/O、用户回调。很多算子状态不是线程安全的例如 keyed state、Python 解释器。不能把整个算子锁起来否则背压会拖垮整个 TaskManager。→ 解决方案把“真正干活”的代码投递到同一个“邮箱”里由一条专用线程按顺序取出执行其他线程只负责“投信”瞬间返回不会阻塞。使用场景速览场景投递到邮箱的任务好处ActionTask 拆分tryProcessActionTaskForKey(key)同一键的任务顺序执行状态无锁Python UDF 异步PythonGeneratorActionTask解释器单线程避免 GIL 竞争网络线程回调onNext(record)网络线程立即返回不阻塞 TCP 接收工作模型用邮筒来隐喻任意线程网络、异步、用户 ├─→ mailbox.put(mail) // 非阻塞瞬间返回 └─→ 立即继续干别的 专用单线程Mailbox Thread ├─→ while(true) mail mailbox.take() ├─→ 顺序执行 mail.run() └─→ 更新状态、发下游、写 checkpoint邮箱使用无锁队列Disruptorput/take 都是 O(1) 且线程安全。单线程内部仍可异步例如生成器返回新的 ActionTask但状态读写无并发。对应示例如下// 投递任务 mailboxExecutor.submit(() - tryProcessActionTaskForKey(key), process action task); // 邮箱线程循环简化 while (isRunning) { Runnable task mailbox.take(); // 阻塞直到有信 task.run(); // 顺序执行 }0x02 ActionExecutionOperator 生成机制和策略2.1 生成位置ActionExecutionOperator 在 ActionExecutionOperatorFactory 类中创建。从代码结构可以看出这个工厂类被用在 CompileUtils.connectToAgent() 方法中。2.2 生成机制创建流程// 在 CompileUtils.java 中 private static K, IN, OUT DataStreamOUT connectToAgent( KeyedStreamIN, K keyedInputStream, AgentPlan agentPlan, TypeInformationOUT outTypeInformation, boolean inputIsJava) { return (DataStreamOUT) keyedInputStream .transform( action-execute-operator, outTypeInformation, new ActionExecutionOperatorFactory(agentPlan, inputIsJava) ) .setParallelism(keyedInputStream.getParallelism()); }操作符通过 Flink 的 transform() 方法创建该方法接收一个 ActionExecutionOperatorFactory。工厂模式实现ActionExecutionOperatorFactory 实现了 Flink 的 StreamOperatorFactory 接口public class ActionExecutionOperatorFactory implements StreamOperatorFactoryObject { private final AgentPlan agentPlan; private final Boolean inputIsJava; Override public T extends StreamOperatorObject T createStreamOperator( StreamOperatorParametersObject parameters) { ProcessingTimeService processingTimeService parameters.getProcessingTimeService(); MailboxExecutor mailboxExecutor parameters.getMailboxExecutor(); return (T) new ActionExecutionOperator( agentPlan, inputIsJava, processingTimeService, mailboxExecutor, null); // actionStateStore }2.3 关键生成策略策略 1基于计划的生成AgentPlan 包含关于动作、资源和配置的所有必要信息这个计划是从用户定义的 Python 或 Java Agent 生成的操作符的所有行为都在创建时由这个计划决定策略 2类型特定处理inputIsJava 参数决定了如何处理输入数据如果为 true输入和输出是 Java 对象如果为 false输入和输出是字节数组用于 Python 互操作策略 3资源管理资源在初始化期间注入到操作符中// 在 ActionExecutionOperator 构造函数中 public ActionExecutionOperator( AgentPlan agentPlan, Boolean inputIsJava, ProcessingTimeService processingTimeService, MailboxExecutor mailboxExecutor, ActionStateStore actionStateStore) { ... } Override public T extends StreamOperatorObject T createStreamOperator( StreamOperatorParametersObject parameters) { ProcessingTimeService processingTimeService parameters.getProcessingTimeService(); MailboxExecutor mailboxExecutor parameters.getMailboxExecutor(); return (T) new ActionExecutionOperator( agentPlan, inputIsJava, processingTimeService, mailboxExecutor, null); // actionStateStore }2.4 操作符初始化过程当操作符打开时。状态初始化短期记忆状态shortTermMemState动作任务状态actionTasksState待处理事件状态pendingInputEventsState处理中的键跟踪状态currentProcessingKeysOpState组件设置Python 执行器如果需要指标收集事件日志记录动作状态存储可选恢复逻辑// 故障/重启后恢复处理 private void tryResumeProcessActionTasks() throws Exception { IterableObject keys currentProcessingKeysOpState.get(); if (keys ! null) { for (Object key : keys) { keySegmentQueue.addKeyToLastSegment(key); mailboxExecutor.submit( () - tryProcessActionTaskForKey(key), process action task); } } }2.5 与执行环境的集成本地环境和远程环境的生成过程不同远程环境Flink 集群// 在 RemoteExecutionEnvironment.java 中 Override public DataStreamObject toDataStream() { if (agentPlan null) { throw new IllegalStateException(Must apply agent before calling toDataStream); } if (outputDataStream null) { if (keySelector ! null) { outputDataStream CompileUtils.connectToAgent(inputDataStream, keySelector, agentPlan); } else { // 如果没有提供键选择器则使用简单的直键选择器 outputDataStream CompileUtils.connectToAgent(inputDataStream, x - x, agentPlan); } } return outputDataStream; }本地环境对于本地执行使用完全不同的 LocalRunner 而不是基于操作符的方法。2.6 总结ActionExecutionOperator 通过 Flink 的转换 API 使用工厂模式生成。生成策略重点关注声明式方法使用 AgentPlan 定义行为类型灵活性通过序列化支持 Java 和 Python状态管理正确管理 Flink 状态以实现容错资源注入提供对已配置资源的访问可恢复处理通过状态快照支持故障恢复这种设计允许操作符基于代理定义动态创建同时保持 Flink 流处理的保证。0x03 任务拆分机制详解3.1 任务拆分的基本原理为什么需要拆分任务ActionExecutionOperator 将复杂的 Action 拆分成多个 ActionTask 的主要原因是为了支持异步操作和状态管理。一个完整的 Action 可能包含以下复杂场景需要等待外部服务响应的异步调用需要分步骤执行的长时间运行任务需要在不同时间点产生多个输出事件的任务拆分粒度每个 ActionTask 代表 Action 中的一个执行单元通常对应以下情况之一Action 的初始执行步骤异步操作的触发和回调处理长时间运行任务的阶段性执行3.2 任务创建过程不同类型 ActionTask 的创建根据 Action 的执行类型Java 或 Python创建相应的 ActionTask 实现JavaActionTask处理 Java 实现的 ActionPythonActionTask处理 Python 实例的 Action动态创建 ActionTask在某些情况下ActionTask.invoke() 可能会产生新的 ActionTask 来继续处理异步操作完成后需要进一步处理结果长时间运行的任务需要分阶段执行需要等待某些条件满足后再继续执行具体对应代码中就是当 ActionTask 被执行时其 invoke() 方法可能会返回一个新的 ActionTask 来继续执行剩余的工作// PythonActionTask.invoke() 示例 public ActionTaskResult invoke() throws Exception { // 执行 Python 函数 String pythonGeneratorRef pythonActionExecutor.executePythonFunction(...); if (pythonGeneratorRef ! null) { // 如果返回了生成器引用创建新的 ActionTask 来处理后续步骤 ActionTask tempGeneratedActionTask new PythonGeneratorActionTask(key, event, action, pythonGeneratorRef); tempGeneratedActionTask.setRunnerContext(runnerContext); return tempGeneratedActionTask.invoke(); } // 如果没有更多步骤标记任务完成 return new ActionTaskResult( true, runnerContext.drainEvents(event.getSourceTimestamp()), null); }也会在 processEvent 中启动新的task。private void processEvent(Object key, Event event) throws Exception { notifyEventProcessed(event); boolean isInputEvent EventUtil.isInputEvent(event); if (EventUtil.isOutputEvent(event)) { // If the event is an OutputEvent, we send it downstream. OUT outputData getOutputFromOutputEvent(event); if (event.hasSourceTimestamp()) { output.collect(reusedStreamRecord.replace(outputData, event.getSourceTimestamp())); } else { reusedStreamRecord.eraseTimestamp(); output.collect(reusedStreamRecord.replace(outputData)); } } else { if (isInputEvent) { // If the event is an InputEvent, we mark that the key is currently being processed. currentProcessingKeysOpState.add(key); initOrIncSequenceNumber(); } // We then obtain the triggered action and add ActionTasks to the waiting processing // queue. ListAction triggerActions getActionsTriggeredBy(event); if (triggerActions ! null !triggerActions.isEmpty()) { for (Action triggerAction : triggerActions) { actionTasksKState.add(createActionTask(key, triggerAction, event)); } } } if (isInputEvent) { // If the event is an InputEvent, we submit a new mail to try processing the actions. mailboxExecutor.submit(() - tryProcessActionTaskForKey(key), process action task); } }createActionTask 的代码如下。private ActionTask createActionTask(Object key, Action action, Event event) { if (action.getExec() instanceof JavaFunction) { return new JavaActionTask( key, event, action, getRuntimeContext().getUserCodeClassLoader()); } else if (action.getExec() instanceof PythonFunction) { return new PythonActionTask(key, event, action); } else { throw new IllegalStateException( Unsupported action type: action.getExec().getClass()); } }0x04 任务队列管理4.1 任务状态存储ActionExecutionOperator 使用 Flink 的状态管理来存储待处理的 ActionTask// 在 operator 中定义的状态 private transient ListStateActionTask actionTasksKState; // 初始化状态 actionTasksKState getRuntimeContext().getListState( new ListStateDescriptor(actionTasks, TypeInformation.of(ActionTask.class)));4.2 任务调度机制ActionExecutionOperator 使用 Flink 的 Mailbox 机制来调度 ActionTask 的执行private void processEvent(Object key, Event event) throws Exception { // ... 处理事件逻辑 ... if (isInputEvent) { // 提交邮件任务来处理 ActionTask mailboxExecutor.submit(() - tryProcessActionTaskForKey(key), process action task); } }4.3 任务处理循环tryProcessActionTaskForKey 方法实现了持续处理同一个 key 下的多个 ActionTask 的机制private void tryProcessActionTaskForKey(Object key) { try { processActionTaskForKey(key); } catch (Exception e) { // 错误处理 } } private void processActionTaskForKey(Object key) throws Exception { // 1. 获取待处理的 ActionTask ActionTask actionTask pollFromListState(actionTasksKState); if (actionTask null) { // 没有更多任务清状态 return; } // 2. 执行 ActionTask ActionTaskResult result actionTask.invoke(); // 3. 处理结果 if (!result.isFinished()) { // 如果未完成将生成的新任务加入队列 actionTasksKState.add(result.getGeneratedActionTask().get()); } // 4. 处理输出事件 for (Event outputEvent : result.getOutputEvents()) { processEvent(key, outputEvent); } // 5. 如果还有任务继续调度 if (currentKeyHasMoreActionTask()) { mailboxExecutor.submit(() - tryProcessActionTaskForKey(key), process action task); } }0x05 实际应用示例5.1 异步 HTTP 请求处理考虑一个需要调用外部 API 的 Actionaction(SomeEvent) def fetch_external_data(event, ctx): # 步骤1: 发起异步HTTP请求 future asyncio.create_task(http_client.get(fhttps://api.example.com/data/{event.id})) # 第一次执行到这里会暂停返回一个生成器 response await future # 步骤2: 处理响应数据 processed_data process_response(response) # 步骤3: 发送结果事件 ctx.send_event(ResultEvent(processed_data))这个 Action 会被拆分为多个 ActionTask初始任务发起 HTTP 请求并等待后续任务处理响应并发送结果5.2 复杂业务流程处理一个多步骤的业务流程也可能被拆分action(OrderEvent) def process_order(event, ctx): # 步骤1验证订单 validate_order(event.order) # 步骤2检查库存可能需要异步调用 inventory_status check_inventory(event.order.items) # 步骤3处理支付可能需要异步调用 payment_result process_payment(event.order) # 步骤4更新订单状态 update_order_status(event.order.id, completed) # 步骤5发送通知 send_notification(event.customer_email, Order completed)

更多文章