简介 窗口机制是Flink流处理的核心,它将无限的流元素分割成有限的窗口。当一个窗口不再增加新元素时,就可以对这个窗口中的所有元素执行计算逻辑。而判断窗口不会再增加新元素的方式有:时间(Watermark)、计数、自定义。其中Watermark
代表事件发生时的时间戳或者Flink收到事件时的时间戳。
Watermark
可以分为三个部分来理解:
来源 Watermark
的来源,它是如何产生的?在不同的配置下,它的产生方式有什么不同?
流转 当产生了一个Watermark
时,它流转到WindowOperator
时都会经过哪些过程?
触发 当WindowOperator
收到Watermark
时,如何处理?这个过程是通过触发器来处理的,用户可以自定义触发器,也可以使用系统提供的触发器,可参考下EventTimeTrigger
的实现。本章不再叙述
注意:Watermark
如果不能及时产生或者流转到WindowOperator
就会造成窗口计算的延迟甚至导致窗口一直不会被触发。
Watermark来源 Watermark的来源有三种:
AutomaticWatermarkContext
自动以系统时间来做为Watermark
,只能发生在SourceTask
用户主动调用SourceContext.emitWark()
,只能发生在SourceTask
TimestampsAndPeriodicWatermarksOperator
从elemet中提取出Watermark
,通过定时任务发射Watermark
,这一步骤可以发生在拓扑图中的任意一个环节。
SourceContext的初始化 首先看下SourceStream
算子的启动过程:
SourceContext
的作用主要是用于发射elements
,也有可能发射Watermark
,从Source算子启动的流程图中可以看出一共它有三种类型。SourceStream在启动的时候会根据配置的时间类型来选择对应的SourceContext
,它的直接子类只有两个WatermarkContext
和 NonTimestampContext
。
WatermarkContext
处理与Watermark
相关的操作。也要维护当前Stream的StreamStatus
,使stream的状态能正确传递给下游。这样做是因为流的状态能会影响下游到对Watermark
的处理 。下游处理StreamStatus
和Watermark
的相关逻辑可以参考StatusWatermarkValve
,它会决定元素是否继续传递给下游。
WatermarkContext
又有两个子类:
NonTimestampContext
发射的所有元素都不会携带时间戳,并且它也不不能发射Watermark
。对应的配置是ProcessingTime
,它不会产生Watermark
,而是由WindowOperator
在收到新元素时,直接根据当前系统时间判断是否要触发计算逻辑。
AutomaticWatermarkContext逻辑 注册定时任务
1 2 3 4 5 6 7 8 9 private AutomaticWatermarkContext(...) { super(timeService, checkpointLock, streamStatusMaintainer, idleTimeout); // 初始化过程 ... // 注册一个定时发射任务 long now = this.timeService.getCurrentProcessingTime(); this.nextWatermarkTimer = this.timeService.registerTimer(now + watermarkInterval, new WatermarkEmittingTask(this.timeService, checkpointLock, output)); }
当收到一个元素时,会判断是否要发射Watermark
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 protected void processAndCollect(T element) { // 取系统时间做为Watermark lastRecordTime = this.timeService.getCurrentProcessingTime(); // 发射元素 output.collect(reuse.replace(element, lastRecordTime)); // this is to avoid lock contention in the lockingObject by // sending the watermark before the firing of the watermark // emission task. if (lastRecordTime > nextWatermarkTime) { // in case we jumped some watermarks, recompute the next watermark time final long watermarkTime = lastRecordTime - (lastRecordTime % watermarkInterval); nextWatermarkTime = watermarkTime + watermarkInterval; output.emitWatermark(new Watermark(watermarkTime)); } }
定时任务 WatermarkEmittingTask
,它是AutomaticWatermarkContext
的内部类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private class WatermarkEmittingTask implements ProcessingTimeCallback { ... @Override public void onProcessingTime(long timestamp) { final long currentTime = timeService.getCurrentProcessingTime(); synchronized (lock) { // we should continue to automatically emit watermarks if we are active if (streamStatusMaintainer.getStreamStatus().isActive()) { if (idleTimeout != -1 && currentTime - lastRecordTime > idleTimeout) { // 检测stream的状态,如果超过一定时间没有元素进来,就会将此stream置位空闲状态从而影响下有对watermark的处理 markAsTemporarilyIdle(); cancelNextIdleDetectionTask(); } else if (currentTime > nextWatermarkTime) { // 发射 final long watermarkTime = currentTime - (currentTime % watermarkInterval); output.emitWatermark(new Watermark(watermarkTime)); nextWatermarkTime = watermarkTime + watermarkInterval; } } } long nextWatermark = currentTime + watermarkInterval; // 注册下一次的定时任务 nextWatermarkTimer = this.timeService.registerTimer(nextWatermark, new WatermarkEmittingTask(this.timeService, lock, output)); } }
TimestampsAndPeriodicWatermarksOperator 是一个StreamOperator
,可以在拓扑图中的任一环节。当job的时间类型为EventTime
时,在拓扑图中需要增加这个算子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public class TimestampsAndPeriodicWatermarksOperator { ... public void open() throws Exception { ... currentWatermark = Long.MIN_VALUE; watermarkInterval = getExecutionConfig().getAutoWatermarkInterval(); // 如果间隔大于0表示开启定时发射watermark任务 if (watermarkInterval > 0) { long now = getProcessingTimeService().getCurrentProcessingTime(); getProcessingTimeService().registerTimer(now + watermarkInterval, this); } } public void processElement(StreamRecord<T> element) throws Exception { final long newTimestamp = userFunction.extractTimestamp(element.getValue(), element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); // 将element的时间戳替换为从elemnt中提取的时间戳 output.collect(element.replace(element.getValue(), newTimestamp)); } public void onProcessingTime(long timestamp) throws Exception { // 从elemnt中提取时间戳 // 这里有一个问题,如果上游一直没有发送元素并且也没切换StreamStatus就会导致watermark阻塞到下游,不能再向前传递 Watermark newWatermark = userFunction.getCurrentWatermark(); if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) { currentWatermark = newWatermark.getTimestamp(); output.emitWatermark(newWatermark); } long now = getProcessingTimeService().getCurrentProcessingTime(); getProcessingTimeService().registerTimer(now + watermarkInterval, this); } public void processWatermark(Watermark mark) throws Exception { // 忽略上游传递的watermarks,仅传递当前算子产生的时间戳。除了代表流结束的watermark if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) { currentWatermark = Long.MAX_VALUE; output.emitWatermark(mark); } } }
下游对Watermark的处理 下游接收上游数据的流程图:
从流程图中可以很清楚的看出来,Watermark、StreamStatus
都是交给StatusWatermarkValve
来进行处理。它相当于一个阀门,当收到水位线或者流的状态的时候判断是否满足相应的条件,如果满足才会再继续发射到下游。
首先看下阀门的一个内部类InputChannelStatus
,这个类非常重要,它封装了上游InputChannel
的状态,在计算过程中都会用到。
1 2 3 4 5 6 7 8 protected static class InputChannelStatus { // 最近一个水位线 long watermark; // 上游状态,Active 或者 Idle StreamStatus streamStatus; // 标示上游的水位线是否对齐,只有对齐时才会使用水位线 boolean isWatermarkAligned; }
处理水位线的大致流程如下:
这个流程复杂,重点看下如何找最小的水位线,这一步骤决定是否发射新的水位线到下游。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private void findAndOutputNewMinWatermarkAcrossAlignedChannels() { long newMinWatermark = Long.MAX_VALUE; boolean hasAlignedChannels = false; // 找最小的水位线 for (InputChannelStatus channelStatus : channelStatuses) { if (channelStatus.isWatermarkAligned) { hasAlignedChannels = true; newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark); } } // 由于所有的InputChannelStatus默认都是激活、对齐,且水位线=Long.MIN_VALUE。 // 如果某个上游(InputChannel)一直没有发送新的水位线且也没有切换状态。 // 这样newMinWatermark的值一直都是Long.MIN_VALUE,从而不会发射新的watermark // 上述情况在EventTime时,可能会发生。 if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) { lastOutputWatermark = newMinWatermark; outputHandler.handleWatermark(new Watermark(lastOutputWatermark)); } }
处理StreamStatus
的过程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public void inputStreamStatus(StreamStatus streamStatus, int channelIndex) { if (streamStatus.isIdle() && channelStatuses[channelIndex].streamStatus.isActive()) { // 处理active -> idle的切换 channelStatuses[channelIndex].streamStatus = StreamStatus.IDLE; // 当channel位idle状态时,设置为非对齐状态,不会影响找newMinWatermark channelStatuses[channelIndex].isWatermarkAligned = false; // 如果所有的上游都为idle,就会给下游发送idle,并且当前的算子状态也置为idle if (!InputChannelStatus.hasActiveChannels(channelStatuses)) { if (channelStatuses[channelIndex].watermark == lastOutputWatermark) { findAndOutputMaxWatermarkAcrossAllChannels(); } lastOutputStreamStatus = StreamStatus.IDLE; outputHandler.handleStreamStatus(lastOutputStreamStatus); } else if (channelStatuses[channelIndex].watermark == lastOutputWatermark) { // 重新找newMinWatermark,且有可能 newMinWatermark > channelStatuses[channelIndex].watermark findAndOutputNewMinWatermarkAcrossAlignedChannels(); } } else if (streamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isIdle()) { // 处理idle -> active的切换 channelStatuses[channelIndex].streamStatus = StreamStatus.ACTIVE; // 对齐 if (channelStatuses[channelIndex].watermark >= lastOutputWatermark) { channelStatuses[channelIndex].isWatermarkAligned = true; } // 重新激活当前算子 if (lastOutputStreamStatus.isIdle()) { lastOutputStreamStatus = StreamStatus.ACTIVE; outputHandler.handleStreamStatus(lastOutputStreamStatus); } } }