Flink源码分析-Watermark

简介

窗口机制是Flink流处理的核心,它将无限的流元素分割成有限的窗口。当一个窗口不再增加新元素时,就可以对这个窗口中的所有元素执行计算逻辑。而判断窗口不会再增加新元素的方式有:时间(Watermark)、计数、自定义。其中Watermark代表事件发生时的时间戳或者Flink收到事件时的时间戳。

Watermark可以分为三个部分来理解:

  • 来源 Watermark的来源,它是如何产生的?在不同的配置下,它的产生方式有什么不同?
  • 流转 当产生了一个Watermark时,它流转到WindowOperator时都会经过哪些过程?
  • 触发 当WindowOperator收到Watermark时,如何处理?这个过程是通过触发器来处理的,用户可以自定义触发器,也可以使用系统提供的触发器,可参考下EventTimeTrigger的实现。本章不再叙述

注意:Watermark如果不能及时产生或者流转到WindowOperator就会造成窗口计算的延迟甚至导致窗口一直不会被触发。

Watermark来源

Watermark的来源有三种:

  1. AutomaticWatermarkContext自动以系统时间来做为Watermark,只能发生在SourceTask
  2. 用户主动调用SourceContext.emitWark(),只能发生在SourceTask
  3. TimestampsAndPeriodicWatermarksOperator从elemet中提取出Watermark,通过定时任务发射Watermark,这一步骤可以发生在拓扑图中的任意一个环节。

SourceContext的初始化

首先看下SourceStream算子的启动过程:

SourceStream算子的启动过程

SourceContext的作用主要是用于发射elements,也有可能发射Watermark,从Source算子启动的流程图中可以看出一共它有三种类型。SourceStream在启动的时候会根据配置的时间类型来选择对应的SourceContext,它的直接子类只有两个WatermarkContextNonTimestampContext

WatermarkContext处理与Watermark相关的操作。也要维护当前Stream的StreamStatus,使stream的状态能正确传递给下游。这样做是因为流的状态能会影响下游到对Watermark的处理。下游处理StreamStatusWatermark的相关逻辑可以参考StatusWatermarkValve,它会决定元素是否继续传递给下游。

WatermarkContext 又有两个子类:

  • AutomaticWatermarkContext
    对应的配置是IngestionTime。它会启动一个定时任务,以固定的时间间隔从系统中提取系统时间作为Watermark发射到下游。

  • ManualWatermarkContext
    对应的配置是EventTime。用户通过它提供的方法可以主动发射Watermark

NonTimestampContext发射的所有元素都不会携带时间戳,并且它也不不能发射Watermark。对应的配置是ProcessingTime,它不会产生Watermark,而是由WindowOperator在收到新元素时,直接根据当前系统时间判断是否要触发计算逻辑。

AutomaticWatermarkContext逻辑

注册定时任务

1
2
3
4
5
6
7
8
9
private AutomaticWatermarkContext(...) {
super(timeService, checkpointLock, streamStatusMaintainer, idleTimeout);
// 初始化过程
...
// 注册一个定时发射任务
long now = this.timeService.getCurrentProcessingTime();
this.nextWatermarkTimer = this.timeService.registerTimer(now + watermarkInterval,
new WatermarkEmittingTask(this.timeService, checkpointLock, output));
}

当收到一个元素时,会判断是否要发射Watermark

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected void processAndCollect(T element) {
// 取系统时间做为Watermark
lastRecordTime = this.timeService.getCurrentProcessingTime();
// 发射元素
output.collect(reuse.replace(element, lastRecordTime));

// this is to avoid lock contention in the lockingObject by
// sending the watermark before the firing of the watermark
// emission task.
if (lastRecordTime > nextWatermarkTime) {
// in case we jumped some watermarks, recompute the next watermark time
final long watermarkTime = lastRecordTime - (lastRecordTime % watermarkInterval);
nextWatermarkTime = watermarkTime + watermarkInterval;
output.emitWatermark(new Watermark(watermarkTime));
}
}

定时任务 WatermarkEmittingTask,它是AutomaticWatermarkContext的内部类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private class WatermarkEmittingTask implements ProcessingTimeCallback {
...

@Override
public void onProcessingTime(long timestamp) {
final long currentTime = timeService.getCurrentProcessingTime();

synchronized (lock) {
// we should continue to automatically emit watermarks if we are active
if (streamStatusMaintainer.getStreamStatus().isActive()) {
if (idleTimeout != -1 && currentTime - lastRecordTime > idleTimeout) {
// 检测stream的状态,如果超过一定时间没有元素进来,就会将此stream置位空闲状态从而影响下有对watermark的处理
markAsTemporarilyIdle();
cancelNextIdleDetectionTask();
} else if (currentTime > nextWatermarkTime) {
// 发射
final long watermarkTime = currentTime - (currentTime % watermarkInterval);
output.emitWatermark(new Watermark(watermarkTime));
nextWatermarkTime = watermarkTime + watermarkInterval;
}
}
}
long nextWatermark = currentTime + watermarkInterval;
// 注册下一次的定时任务
nextWatermarkTimer = this.timeService.registerTimer(nextWatermark, new WatermarkEmittingTask(this.timeService, lock, output));
}
}

TimestampsAndPeriodicWatermarksOperator

是一个StreamOperator,可以在拓扑图中的任一环节。当job的时间类型为EventTime时,在拓扑图中需要增加这个算子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class TimestampsAndPeriodicWatermarksOperator {
...

public void open() throws Exception {
...

currentWatermark = Long.MIN_VALUE;
watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
// 如果间隔大于0表示开启定时发射watermark任务
if (watermarkInterval > 0) {
long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}
}

public void processElement(StreamRecord<T> element) throws Exception {
final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
// 将element的时间戳替换为从elemnt中提取的时间戳
output.collect(element.replace(element.getValue(), newTimestamp));
}

public void onProcessingTime(long timestamp) throws Exception {
// 从elemnt中提取时间戳
// 这里有一个问题,如果上游一直没有发送元素并且也没切换StreamStatus就会导致watermark阻塞到下游,不能再向前传递
Watermark newWatermark = userFunction.getCurrentWatermark();
if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
currentWatermark = newWatermark.getTimestamp();
output.emitWatermark(newWatermark);
}
long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}

public void processWatermark(Watermark mark) throws Exception {
// 忽略上游传递的watermarks,仅传递当前算子产生的时间戳。除了代表流结束的watermark
if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
currentWatermark = Long.MAX_VALUE;
output.emitWatermark(mark);
}
}
}

下游对Watermark的处理

下游接收上游数据的流程图:
下游接收上游数据过程

从流程图中可以很清楚的看出来,Watermark、StreamStatus都是交给StatusWatermarkValve来进行处理。它相当于一个阀门,当收到水位线或者流的状态的时候判断是否满足相应的条件,如果满足才会再继续发射到下游。

首先看下阀门的一个内部类InputChannelStatus,这个类非常重要,它封装了上游InputChannel的状态,在计算过程中都会用到。

1
2
3
4
5
6
7
8
protected static class InputChannelStatus {
// 最近一个水位线
long watermark;
// 上游状态,Active 或者 Idle
StreamStatus streamStatus;
// 标示上游的水位线是否对齐,只有对齐时才会使用水位线
boolean isWatermarkAligned;
}

处理水位线的大致流程如下:
处理水位线

这个流程复杂,重点看下如何找最小的水位线,这一步骤决定是否发射新的水位线到下游。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
long newMinWatermark = Long.MAX_VALUE;
boolean hasAlignedChannels = false;
// 找最小的水位线
for (InputChannelStatus channelStatus : channelStatuses) {
if (channelStatus.isWatermarkAligned) {
hasAlignedChannels = true;
newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
}
}
// 由于所有的InputChannelStatus默认都是激活、对齐,且水位线=Long.MIN_VALUE。
// 如果某个上游(InputChannel)一直没有发送新的水位线且也没有切换状态。
// 这样newMinWatermark的值一直都是Long.MIN_VALUE,从而不会发射新的watermark
// 上述情况在EventTime时,可能会发生。
if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
lastOutputWatermark = newMinWatermark;
outputHandler.handleWatermark(new Watermark(lastOutputWatermark));
}
}

处理StreamStatus的过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void inputStreamStatus(StreamStatus streamStatus, int channelIndex) {
if (streamStatus.isIdle() && channelStatuses[channelIndex].streamStatus.isActive()) {
// 处理active -> idle的切换
channelStatuses[channelIndex].streamStatus = StreamStatus.IDLE;
// 当channel位idle状态时,设置为非对齐状态,不会影响找newMinWatermark
channelStatuses[channelIndex].isWatermarkAligned = false;

// 如果所有的上游都为idle,就会给下游发送idle,并且当前的算子状态也置为idle
if (!InputChannelStatus.hasActiveChannels(channelStatuses)) {

if (channelStatuses[channelIndex].watermark == lastOutputWatermark) {
findAndOutputMaxWatermarkAcrossAllChannels();
}
lastOutputStreamStatus = StreamStatus.IDLE;
outputHandler.handleStreamStatus(lastOutputStreamStatus);
} else if (channelStatuses[channelIndex].watermark == lastOutputWatermark) {
// 重新找newMinWatermark,且有可能 newMinWatermark > channelStatuses[channelIndex].watermark
findAndOutputNewMinWatermarkAcrossAlignedChannels();
}
} else if (streamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isIdle()) {
// 处理idle -> active的切换
channelStatuses[channelIndex].streamStatus = StreamStatus.ACTIVE;
// 对齐
if (channelStatuses[channelIndex].watermark >= lastOutputWatermark) {
channelStatuses[channelIndex].isWatermarkAligned = true;
}
// 重新激活当前算子
if (lastOutputStreamStatus.isIdle()) {
lastOutputStreamStatus = StreamStatus.ACTIVE;
outputHandler.handleStreamStatus(lastOutputStreamStatus);
}
}
}

评论