Flink-异步operator
当某个operator执行需要很长时间的话,使用异步操作对吞吐量的提升非常有帮助。
其实过程重要基于以下几个步骤:
- 处理element过程异步化,将处理element的过程提交到一个线程池中
- 处理结果完成后,将结果提交给ResultFuture中
- ResultFuture提交到结果队列中,队列分为有序和无序两种
- 有序队列中,有一个ResultFuture队列,如果有任何一个ResultFuture完成,会调用headIsCompleted.signalAll()来通知正在等待结果的线程有新元素到来
- 无序队列中,有两个集合,completedQueue,firstSet 当有新元素近来时会先放置到firstSet中,当有处理过程完成时,将结果放到completedQueue中,并且调用hasCompletedEntries.signalAll()
- Emitter会开启一个专门的线程,这个线程中会调用queue.peekBlockingly()来监测是否有新的结果到来,当有结果时,调用output.collect将结果传递给下游
- AsyncWaitOperator继承自AbstractUdfStreamOperator,在初始的时候会开启Emitter线程
- 由于DataStream没有直接支持AsyncWaitOperator,因此调用DataStream.tranform来将AsyncWaitOperator添加到流中
- UserFunction继承RichAsyncFunction,初始化时开启一个线程池用于异步处理element。重写asyncInvoke(input, ResultFuture)方法,将处理过程封装成Runnable提交到线程中,完成之后调用ResultFuture.complete来执行后续的流程