Flink-异步operator

当某个operator执行需要很长时间的话,使用异步操作对吞吐量的提升非常有帮助。

其实过程重要基于以下几个步骤:

  1. 处理element过程异步化,将处理element的过程提交到一个线程池中
  2. 处理结果完成后,将结果提交给ResultFuture中
  3. ResultFuture提交到结果队列中,队列分为有序和无序两种
  4. 有序队列中,有一个ResultFuture队列,如果有任何一个ResultFuture完成,会调用headIsCompleted.signalAll()来通知正在等待结果的线程有新元素到来
  5. 无序队列中,有两个集合,completedQueue,firstSet 当有新元素近来时会先放置到firstSet中,当有处理过程完成时,将结果放到completedQueue中,并且调用hasCompletedEntries.signalAll()
  6. Emitter会开启一个专门的线程,这个线程中会调用queue.peekBlockingly()来监测是否有新的结果到来,当有结果时,调用output.collect将结果传递给下游
  7. AsyncWaitOperator继承自AbstractUdfStreamOperator,在初始的时候会开启Emitter线程
  8. 由于DataStream没有直接支持AsyncWaitOperator,因此调用DataStream.tranform来将AsyncWaitOperator添加到流中
  9. UserFunction继承RichAsyncFunction,初始化时开启一个线程池用于异步处理element。重写asyncInvoke(input, ResultFuture)方法,将处理过程封装成Runnable提交到线程中,完成之后调用ResultFuture.complete来执行后续的流程

评论