Flink系列之Flink之Time和WaterMark深入剖析
创始人
2024-03-22 21:28:28
0

title: Flink系列


一、Flink Window 常见需求背景

1.0 理论描述

需求描述:

每隔 5 秒,计算最近 10 秒单词出现的次数。 滑动窗口

每隔 5 秒,计算最近 5 秒单词出现的次数。 滚动窗口

在这里插入图片描述

第一个: 关于 TimeCharacteristic

	ProcessingTimeIngestionTimeEventTime

TimeCharacteristic在源码中的位置:

路径: org.apache.flink.streaming.api.TimeCharacteristic

在这里插入图片描述

第二个:SlidingProcessingTimeWindows 可以拆分为:Sliding + ProcessingTime + TimeWindows,是 WindowAssigner 的子类

常见的是下面四类:

	SlidingProcessingTimeWindowsSlidingEventTimeWindowsTumblingEventTimeWindowsTumblingProcessingTimeWindows

SlidingProcessingTimeWindows 在源码中的位置:

路径:org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows

在这里插入图片描述

WindowAssigner的一些子类。

在这里插入图片描述

1.1 TimeWindow 实现

代码如下:

package com.aa.flinkjava.window;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;/*** @Author AA* @Project bigdatapre* @Package com.aa.flinkjava.window* 时间窗口第一个案例,入门版* 时间窗口版本  WordCount* 需求: 每隔5秒,统计最近10秒的数据*/
public class TimeWindow01_TimeWindow {public static void main(String[] args) throws Exception {//1、获取环境对象StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();//使用 老的API timeWindow(Time.seconds(10),Time.seconds(5)) 。需要设置如下参数://executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);//2、输入源DataStreamSource dataStreamSource = executionEnvironment.socketTextStream("hadoop12", 9999);//3、逻辑处理SingleOutputStreamOperator> result = dataStreamSource.flatMap(new FlatMapFunction>() {@Overridepublic void flatMap(String value, Collector> out) throws Exception {String[] words = value.split(" ");for (String word : words) {out.collect(new Tuple2(word, 1));}}}).keyBy(tuple -> tuple.f0)//每隔5秒,统计最近10秒的数据//下面timeWindow(Time.seconds(10),Time.seconds(5)) 是老的API。// 老的API,如果想要使用,需要在前面设置:executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);//.timeWindow(Time.seconds(10),Time.seconds(5))//下面是新的版本的使用方式 window.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum(1);//4、输出result.print();//5、启动执行executionEnvironment.execute();}
}

1.2 ProcessWindowFunction

代码如下:

package com.aa.flinkjava.window;import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;/*** @Author AA* @Project bigdatapre* @Package com.aa.flinkjava.window* 时间窗口第二个案例  就是为了看一下内部的实现细节。* 需求: 每隔5秒,统计最近10秒的数据*/
public class TimeWindow02_ProcessWindowFunction {public static void main(String[] args) throws Exception {//1、获取环境对象StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();//使用 老的API timeWindow(Time.seconds(10),Time.seconds(5)) 。需要设置如下参数://executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);//2、输入源DataStreamSource dataStreamSource = executionEnvironment.socketTextStream("hadoop12", 9999);//3、逻辑处理SingleOutputStreamOperator> result = dataStreamSource.flatMap(new FlatMapFunction>() {@Overridepublic void flatMap(String value, Collector> out) throws Exception {String[] words = value.split(" ");for (String word : words) {out.collect(new Tuple2(word, 1));}}}).keyBy(tuple -> tuple.f0)//每隔5秒,统计最近10秒的数据//下面timeWindow(Time.seconds(10),Time.seconds(5)) 是老的API。// 老的API,如果想要使用,需要在前面设置:executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);//.timeWindow(Time.seconds(10),Time.seconds(5))//下面是新的版本的使用方式 window.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))//.sum(1);.process(new MySumProcessFunction());//4、输出result.print();//5、启动执行executionEnvironment.execute();}/*** ProcessWindowFunction[IN, OUT, KEY, W <: Window]* Type parameters:* IN – The type of the input value.* OUT – The type of the output value.* KEY – The type of the key.* W – The type of the window*///注意不要导入错包了。这个ProcessWindowFunction 是java包下面的,不是scala包下面的,scala包下面也有这个。注意。static class MySumProcessFunction extends ProcessWindowFunction,Tuple2,String,TimeWindow>{FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");@Overridepublic void process(String key, Context context, Iterable> elements, Collector> out) throws Exception {System.out.println("==========出发窗口的分界线=========");System.out.println("当前系统时间: " + df.format(System.currentTimeMillis()));System.out.println("当前窗口处理时间: " + df.format(context.currentProcessingTime()));System.out.println("当前窗口开始时间: " + df.format(context.window().getStart()));System.out.println("当前窗口结束时间: " + df.format(context.window().getEnd()));int count = 0;for (Tuple2 element : elements) {count++;}out.collect(Tuple2.of(key,count));}}}

1.3 Flink Time 种类

Flink 的数据流处理定义了三种 Time,分别是:

  1. Event Time:事件产生的时间,它通常由事件中的时间戳描述。
  2. Ingestion time:事件进入 Flink 的时间(一般不用)
  3. Processing Time:事件被处理时当前系统的时间

官网的一张图,如下:

在这里插入图片描述

举例解释:现有一条生产日志样例,如下

2022-11-11 19:00:01 134 INFO executor.Executor: Finished task in state 0.0

这条数据进入 Flink 的时间是 2022-11-11 20:00:00 102,到达 window 处理的时间为 2022-11-11 20:00:01 100

则对应的 三个 Time 分别是:

Event time:2022-11-11 19:00:01 134
Ingestion time:2022-11-11 20:00:00 102
Processing time:2022-11-11 20:00:01 100

在企业生产环境中,一般使用 EventTime 来进行计算,会更加符合业务需求。比如下述需求:

统计每分钟内接口调用失败的错误日志个数。
统计每分钟每种类型商品的成交单数。

接下来后续的知识点,就是在告诉大家,如果事件是无序的。所有的事件按照 event time 是乱序到达的

假设数据有序,基于 Process Time Window 做处理有问题么? 没问题
假设数据无序,基于 Process Time Window 做处理有问题么? 有问题
解决方案:基于 eventTime  从去执行处理,会纠正部分结果,不会把所有计算都算正确
解决方案:基于 Flink 提供的 watermark 实现这个需求

最终的结论:Flink 基于 Window + EventTime + Watermark 联合起来完成乱序数据的处理

// 如果基于 evnetTime 和 water 去实现乱序数据的处理
.assignTimestampsAndWatermarks(// 指定 watermark 的规则WatermarkStrategy.forGenerator((ctx) -> new PeriodicWatermarkGenerator())// 指定 eventTime 的定义.withTimestampAssigner((ctx) -> new TimeStampExtractor())) //指定时间字段

二、Process Time Window(有序)

2.1 需求(理论)

需求:每隔 5秒 计算最近 10秒 的单词出现的次数(类似于需求:接口调用出错的次数)

会产生的窗口有:

20:55:00 - 20:55:10
20:55:05 - 20:55:15
20:55:10 - 20:55:20
....

自定义source,模拟:第 13 秒的时候连续发送 2 个事件,第 16 秒的时候再发送 1 个事件

在这里插入图片描述

2.2 代码

代码如下:

package com.aa.flinkjava.window;import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.concurrent.TimeUnit;/*** @Author AA* @Project bigdatapre* @Package com.aa.flinkjava.window** 自定义Source方式进行 单词统计* 单词过来的顺序 是 正序 的*/
public class TimeWindow03_WithMySource1 {public static void main(String[] args) throws Exception {//1、获取环境对象StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();//使用 老的API timeWindow(Time.seconds(10),Time.seconds(5)) 。需要设置如下参数://executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);//2、输入源DataStreamSource dataStreamSource = executionEnvironment.addSource(new MySource());//3、逻辑处理SingleOutputStreamOperator> result = dataStreamSource.flatMap(new FlatMapFunction>() {@Overridepublic void flatMap(String value, Collector> out) throws Exception {String[] words = value.split(" ");for (String word : words) {out.collect(new Tuple2(word, 1));}}}).keyBy(tuple -> tuple.f0)//每隔5秒,统计最近10秒的数据//下面timeWindow(Time.seconds(10),Time.seconds(5)) 是老的API。// 老的API,如果想要使用,需要在前面设置:executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);//.timeWindow(Time.seconds(10),Time.seconds(5))//下面是新的版本的使用方式 window.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))//.sum(1);.process(new MySumProcessFunction());//4、输出result.print();//5、启动执行executionEnvironment.execute();}//注意不要导入错包了。这个ProcessWindowFunction 是java包下面的,不是scala包下面的,scala包下面也有这个。注意。static class MySumProcessFunction extends ProcessWindowFunction,Tuple2,String, TimeWindow> {FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");@Overridepublic void process(String key, Context context, Iterable> elements, Collector> out) throws Exception {System.out.println("==========出发窗口的分界线=========");System.out.println("当前系统时间: " + df.format(System.currentTimeMillis()));System.out.println("当前窗口处理时间: " + df.format(context.currentProcessingTime()));System.out.println("当前窗口开始时间: " + df.format(context.window().getStart()));System.out.println("当前窗口结束时间: " + df.format(context.window().getEnd()));int count = 0;for (Tuple2 element : elements) {count++;}out.collect(Tuple2.of(key,count));}}static class MySource implements SourceFunction{FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");@Overridepublic void run(SourceContext ctx) throws Exception {String currentTime = String.valueOf(System.currentTimeMillis());System.out.println("判断条件之前的currentTime : " + currentTime);//下面这个判断的操作是为了保证是 10 s 的倍数while (Integer.valueOf(currentTime.substring(currentTime.length() - 4)) > 100) {currentTime = String.valueOf(System.currentTimeMillis());//System.out.println("while里面的currentTime : " + currentTime);continue;}System.out.println("判断条件之后的currentTime : " + currentTime);System.out.println("当前时间:" + df.format(System.currentTimeMillis()));/*** 当前时间:15:46:30* ==========出发窗口的分界线=========* 当前系统时间: 15:46:45* 当前窗口处理时间: 15:46:45* 当前窗口开始时间: 15:46:35* 当前窗口结束时间: 15:46:45* 7> (flink,2)* ==========出发窗口的分界线=========* 当前系统时间: 15:46:50* 当前窗口处理时间: 15:46:50* 当前窗口开始时间: 15:46:40* 当前窗口结束时间: 15:46:50* 7> (flink,3)* ==========出发窗口的分界线=========* 当前系统时间: 15:46:55* 当前窗口处理时间: 15:46:55* 当前窗口开始时间: 15:46:45* 当前窗口结束时间: 15:46:55* 7> (flink,1)*///开始之后第12秒的时候放进去两个单词数据TimeUnit.SECONDS.sleep(12);ctx.collect("flink");ctx.collect("flink");//开始之后第12+4秒的时候放进去一个单词数据TimeUnit.SECONDS.sleep(4);ctx.collect("flink");TimeUnit.SECONDS.sleep(3600);}@Overridepublic void cancel() {}}}

2.3 图解

在这里插入图片描述

三、Process Time Window(无序)

3.1 需求(理论)

自定义 Source,模拟

1、正常情况下第 13 秒的时候连续发送 2 个事件
2、但是有一个事件在第 13 秒的发送出去成功了,另外一个事件数据在 19 秒的时候才发送出去
3、在第 16 秒的时候再发送 1 个事件

3.2 代码

代码如下:

package com.aa.flinkjava.window;import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.concurrent.TimeUnit;/*** @Author AA* @Project bigdatapre* @Package com.aa.flinkjava.window** 自定义Source方式进行 单词统计* 单词过来的顺序 是 乱序 的** 如果数据乱序到达的,基于ProcessingTime进行处理会有什么现象?*/
public class TimeWindow04_WithMySource2 {public static void main(String[] args) throws Exception {//1、获取环境对象StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();//使用 老的API timeWindow(Time.seconds(10),Time.seconds(5)) 。需要设置如下参数://executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);//2、输入源DataStreamSource dataStreamSource = executionEnvironment.addSource(new MySource());//3、逻辑处理SingleOutputStreamOperator> result = dataStreamSource.flatMap(new FlatMapFunction>() {@Overridepublic void flatMap(String value, Collector> out) throws Exception {String[] words = value.split(" ");for (String word : words) {out.collect(new Tuple2(word, 1));}}}).keyBy(tuple -> tuple.f0)//每隔5秒,统计最近10秒的数据//下面timeWindow(Time.seconds(10),Time.seconds(5)) 是老的API。// 老的API,如果想要使用,需要在前面设置:executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);//.timeWindow(Time.seconds(10),Time.seconds(5))//下面是新的版本的使用方式 window.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))//.sum(1);.process(new MySumProcessFunction());//4、输出result.print();//5、启动执行executionEnvironment.execute();}//注意不要导入错包了。这个ProcessWindowFunction 是java包下面的,不是scala包下面的,scala包下面也有这个。注意。static class MySumProcessFunction extends ProcessWindowFunction,Tuple2,String, TimeWindow> {FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");@Overridepublic void process(String key, Context context, Iterable> elements, Collector> out) throws Exception {System.out.println("==========触发窗口的分界线=========");System.out.println("当前系统时间: " + df.format(System.currentTimeMillis()));System.out.println("当前窗口处理时间: " + df.format(context.currentProcessingTime()));System.out.println("当前窗口开始时间: " + df.format(context.window().getStart()));System.out.println("当前窗口结束时间: " + df.format(context.window().getEnd()));int count = 0;for (Tuple2 element : elements) {count++;}out.collect(Tuple2.of(key,count));}}static class MySource implements SourceFunction {FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");@Overridepublic void run(SourceContext ctx) throws Exception {String currentTime = String.valueOf(System.currentTimeMillis());//下面这个判断的操作是为了保证是 10 s 的倍数while (Integer.valueOf(currentTime.substring(currentTime.length() - 4)) > 100) {currentTime = String.valueOf(System.currentTimeMillis());continue;}System.out.println("当前时间:" + df.format(System.currentTimeMillis()));//开始之后第12秒的时候放进去两个单词数据TimeUnit.SECONDS.sleep(12);String log = "flink";ctx.collect(log);//开始之后第12+4秒的时候放进去一个单词数据TimeUnit.SECONDS.sleep(4);ctx.collect("flink");//数据出现了延迟,本来这条数据是应该第12秒的时候处理的,现在拖到了第19秒才处理输出。TimeUnit.SECONDS.sleep(3);ctx.collect(log);TimeUnit.SECONDS.sleep(3600);}@Overridepublic void cancel() {}}
}

3.3 图解

在这里插入图片描述

四、使用 EventTime 处理无序

4.1 需求

由于在 三 中的上述程序执行得到的结果,并不是需求实现,所以需要改进,我们通过 Flink 提供的 EventTime 来改进

4.2 代码

代码如下:

package com.aa.flinkjava.window;import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.io.Serializable;
import java.util.concurrent.TimeUnit;/*** @Author AA* @Project bigdatapre* @Package com.aa.flinkjava.window* 使用EventTime初步处理乱序数据*/
public class TimeWindow05_WithMySource3_ByEventTime {public static void main(String[] args) throws Exception {//1、获取环境对象StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();//2、输入源DataStreamSource dataStreamSource = executionEnvironment.addSource(new MySource());//3、逻辑处理SingleOutputStreamOperator> result = dataStreamSource.flatMap(new FlatMapFunction>() {@Overridepublic void flatMap(String value, Collector> out) throws Exception {String[] words = value.split(",");//给传输过来的字段进行拆分。  给数据和时间戳分开//第一个元素 是 数据 flink ,第二个是时间戳out.collect(Tuple2.of(words[0],Long.valueOf(words[1])));}})//指定日志当中的时间戳字段当做这个事件的EventTime。需要通过下面的assignTimestampsAndWatermarks 来指定。//withTimestampAssigner 就是用来指定时间戳定义的。//Watermark 在这个案例中先不管,后面案例专门讲解。.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(context -> new MyWaterGenerator()).withTimestampAssigner(context -> new MyTimestampAssigner())).keyBy(tuple -> tuple.f0)//每隔5秒,统计最近10秒的数据.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))//.sum(1);.process(new MySumProcessFunction());//4、输出result.print();//5、启动执行executionEnvironment.execute("TimeWindow05_WithMySource3_ByEventTime");}//注意不要导入错包了。这个ProcessWindowFunction 是java包下面的,不是scala包下面的,scala包下面也有这个。注意。static class MySumProcessFunction extends ProcessWindowFunction,Tuple2,String, TimeWindow> {FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");@Overridepublic void process(String key, Context context, Iterable> elements, Collector> out) throws Exception {System.out.println("==========触发窗口的分界线=========");System.out.println("当前系统时间: " + df.format(System.currentTimeMillis()));System.out.println("当前窗口处理时间: " + df.format(context.currentProcessingTime()));System.out.println("当前窗口开始时间: " + df.format(context.window().getStart()));System.out.println("当前窗口结束时间: " + df.format(context.window().getEnd()));Long count = 0L;for (Tuple2 element : elements) {count++;}out.collect(Tuple2.of(key,count));}}static class MySource implements SourceFunction {FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");@Overridepublic void run(SourceContext ctx) throws Exception {String currentTime = String.valueOf(System.currentTimeMillis());//下面这个判断的操作是为了保证是 10 s 的倍数while (Integer.valueOf(currentTime.substring(currentTime.length() - 4)) > 100) {currentTime = String.valueOf(System.currentTimeMillis());continue;}System.out.println("当前时间:" + df.format(System.currentTimeMillis()));//开始之后第13秒的时候放进去两个单词数据TimeUnit.SECONDS.sleep(13);//给日志打上一个时间戳的标记String log = "flink," + System.currentTimeMillis();String log1 = log;ctx.collect(log);//开始之后第13+3秒的时候放进去一个单词数据TimeUnit.SECONDS.sleep(3);ctx.collect("flink," + System.currentTimeMillis());//数据出现了延迟,本来这条数据是应该第13秒的时候处理的,现在拖到了第19秒才处理输出。TimeUnit.SECONDS.sleep(3);ctx.collect(log1);TimeUnit.SECONDS.sleep(3600);}@Overridepublic void cancel() {}}/*** 指定时间字段*/static class MyWaterGenerator implements WatermarkGenerator>, Serializable{FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");@Overridepublic void onEvent(Tuple2 event, long eventTimestamp, WatermarkOutput output) {System.out.println("eventTimestamp: " + eventTimestamp);System.out.println("eventTimestamp: " + df.format(eventTimestamp));}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}}/*** 指定eventTime的字段*/static class MyTimestampAssigner implements TimestampAssigner>{@Overridepublic long extractTimestamp(Tuple2 element, long recordTimestamp) {return element.f1;}}}

4.3 图解

在这里插入图片描述

五、使用 WaterMark 机制解决无序

5.1 需求

根据上述的测试发现:

纠正了第三个窗口的计算结果
第一个窗口的计算结果依然是错的

解决方案:需要让第一个窗口延迟一段时间再执行计算,也就是等待 第三条数据接收到的时候,再执行计算,就能得到正确结果。Flink 提供了 Watermark 机制来帮我们解决这个问题。

5.2 代码

代码如下:

package com.aa.flinkjava.window;import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.io.Serializable;
import java.util.concurrent.TimeUnit;/*** @Author AA* @Project bigdatapre* @Package com.aa.flinkjava.window*/
public class TimeWindow06_ByWaterMark01 {public static void main(String[] args) throws Exception {//1、获取环境对象StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();executionEnvironment.setParallelism(1);//2、输入源DataStreamSource dataStreamSource = executionEnvironment.addSource(new MySource());//3、逻辑处理SingleOutputStreamOperator> result = dataStreamSource.flatMap(new FlatMapFunction>() {@Overridepublic void flatMap(String value, Collector> out) throws Exception {String[] words = value.split(",");//给传输过来的字段进行拆分。  给数据和时间戳分开out.collect(Tuple2.of(words[0],Long.valueOf(words[1])));}})// 基于eventTime 和water 去实现乱序数据的处理.assignTimestampsAndWatermarks(//指定watermark的规则WatermarkStrategy.forGenerator((context1) -> new MyWaterGenerator())//指定eventTime的定义.withTimestampAssigner((context1) -> new MyTimestampAssigner())).keyBy(tuple -> tuple.f0)//每隔5秒,统计最近10秒的数据.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))//.sum(1);.process(new MySumProcessFunction());//4、输出result.print();//5、启动执行executionEnvironment.execute("TimeWindow06_ByWaterMark01");}//注意不要导入错包了。这个ProcessWindowFunction 是java包下面的,不是scala包下面的,scala包下面也有这个。注意。static class MySumProcessFunction extends ProcessWindowFunction,Tuple2,String, TimeWindow> {FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");@Overridepublic void process(String key, Context context, Iterable> elements, Collector> out) throws Exception {System.out.println("==========触发窗口的分界线=========");System.out.println("当前process系统时间: " + df.format(System.currentTimeMillis()));System.out.println("当前窗口处理时间: " + df.format(context.currentProcessingTime()));System.out.println("当前窗口开始时间: " + df.format(context.window().getStart()));System.out.println("当前窗口结束时间: " + df.format(context.window().getEnd()));Long count = 0L;for (Tuple2 element : elements) {count++;}out.collect(Tuple2.of(key,count));}}/*** 自定义数据源*/static class MySource implements SourceFunction {FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");@Overridepublic void run(SourceContext ctx) throws Exception {String currentTime = String.valueOf(System.currentTimeMillis());//下面这个判断的操作是为了保证是 10 s 的倍数while (Integer.valueOf(currentTime.substring(currentTime.length() - 4)) > 100) {currentTime = String.valueOf(System.currentTimeMillis());continue;}System.out.println("当前时间:" + df.format(System.currentTimeMillis()));//开始之后第13秒的时候放进去两个单词数据TimeUnit.SECONDS.sleep(13);String log = "flink," + System.currentTimeMillis();String log1 = log;ctx.collect(log);//开始之后第13+3秒的时候放进去一个单词数据TimeUnit.SECONDS.sleep(3);ctx.collect("flink," + System.currentTimeMillis());//数据出现了延迟,本来这条数据是应该第13秒的时候处理的,现在拖到了第19秒才处理输出。TimeUnit.SECONDS.sleep(3);ctx.collect(log1);TimeUnit.SECONDS.sleep(3000);}@Overridepublic void cancel() {}}/*** 指定时间字段** WatermarkGenerator: watermark 生成器。 一个接口。* 用的时候自己写个实现这个接口就行了。*/static class MyWaterGenerator implements WatermarkGenerator>, Serializable {FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");//每次接受到一条数据,其实就执行了一次处理。@Overridepublic void onEvent(Tuple2 event, long eventTimestamp, WatermarkOutput output) {System.out.println("eventTimestamp: " + eventTimestamp);System.out.println("eventTimestamp: " + df.format(eventTimestamp));}/*** 定期发送 watermark* @param output*/@Overridepublic void onPeriodicEmit(WatermarkOutput output) {//在这个里面指定延迟5秒钟处理。output.emitWatermark(new Watermark(System.currentTimeMillis() - 5000));}}/*** 指定eventTime的字段*/static class MyTimestampAssigner implements TimestampAssigner> {@Overridepublic long extractTimestamp(Tuple2 element, long recordTimestamp) {return element.f1;}}
}

5.3 图解

在这里插入图片描述

六、Flink Watermark 机制定义

​ Flink 使用 EventTime 的时候如何处理乱序数据?

​ 我们知道,流处理从事件产生,到流经 Source,再到 Operator,中间是有一个过程和时间的。虽然大部分情况下,流到 Operator 的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络延迟等原因,导致乱序的产生,特别是使用 kafka 的话,多个分区的数据无法保证有序。所以在进行 window 计算的时候,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发 window 去进行计算了。这个特别的机制,就是 Watermark ,Watermark 是用于处理乱序事件的。Watermark 可以翻译为水位线

	WaterMark 是一种度量 event Time 进度机制,watermark 作为数据流中的一部分,在 Stream 中流动,并携带 time stamp,一个 WaterMark(t) 表明在流中处理的 EventTime 已经到达了 t,那么在流中就不会再有 Event Time 小于 t 的时间产生了 。

有序的流的 Watermarks

在这里插入图片描述

无序的流的 Watermarks

在这里插入图片描述

多并行度的流的 Watermarks

在这里插入图片描述

七、深入理解 Flink Watermark

7.1 理论

需求:得到并打印每隔 3 秒钟统计相同的 key 的所有的事件(string),相当于就是单词计数,每 3s 统计一次

简单总结一下:每隔 3s 做一次单词统计,这是一个滚动窗口的计算需求。

背景:里面的数据可能就是乱序

解决方案:通过 Flink Window 和 Watermark 来解决

当前知识点的重点: 观测:window 是什么时候触发

在这里插入图片描述

7.2 代码

代码如下:

package com.aa.flinkjava.window;import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.io.Serializable;
import java.util.ArrayList;/*** @Author AA* @Project bigdatapre* @Package com.aa.flinkjava.window*** 测试性数据:* 数据, eventTime* flink,1641756862000* flink,1641756866000* flink,1641756872000* flink,1641756873000* flink,1641756874000* flink,1641756876000* flink,1641756877000*/
public class TimeWindow06_ByWaterMark02 {public static void main(String[] args) throws Exception {//1、获取环境对象StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();executionEnvironment.setParallelism(1);//2、输入源DataStreamSource dataStreamSource = executionEnvironment.socketTextStream("hadoop12",9999);//3、逻辑处理SingleOutputStreamOperator result = dataStreamSource.flatMap(new FlatMapFunction>() {@Overridepublic void flatMap(String value, Collector> out) throws Exception {String[] words = value.split(",");//给传输过来的字段进行拆分。  给数据和时间戳分开out.collect(Tuple2.of(words[0],Long.valueOf(words[1])));}})// 基于eventTime 和water 去实现乱序数据的处理.assignTimestampsAndWatermarks(//指定watermark的规则WatermarkStrategy.forGenerator((context1) -> new MyWaterGenerator())//指定eventTime的定义.withTimestampAssigner((context1) -> new MyTimestampAssigner())).keyBy(tuple -> tuple.f0)//每隔5秒,统计最近10秒的数据//注意下面是 SlidingEventTimeWindows  。不是  SlidingProcessingTimeWindows 。.window(TumblingEventTimeWindows.of(Time.seconds(3)))//.sum(1);.process(new MySumProcessFunction());//4、输出result.print();//5、启动执行executionEnvironment.execute("TimeWindow06_ByWaterMark02");}//注意不要导入错包了。这个ProcessWindowFunction 是java包下面的,不是scala包下面的,scala包下面也有这个。注意。static class MySumProcessFunction extends ProcessWindowFunction,String,String, TimeWindow> {FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");@Overridepublic void process(String key, Context context, Iterable> elements, Collector out) throws Exception {System.out.println("==========触发窗口的分界线=========");//System.out.println("当前process系统时间: " + df.format(System.currentTimeMillis()));System.out.println("处理时间: " + df.format(context.currentProcessingTime()));System.out.println("当前窗口开始时间: " + df.format(context.window().getStart()));ArrayList list = new ArrayList<>();for (Tuple2 element : elements) {list.add(element.toString() + "|" + df.format(element.f1));}out.collect(list.toString());System.out.println("当前窗口结束时间: " + df.format(context.window().getEnd()));}}/*** 指定时间字段** WatermarkGenerator: watermark 生成器。 一个接口。* 用的时候自己写个实现这个接口就行了。*/static class MyWaterGenerator implements WatermarkGenerator>, Serializable {FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");//当前的窗口里面的最大的事件时间private long currentMaxEventTime = 0L;//设置最大的允许乱序的时间 , 假如 是 10秒  。// 延迟时间private long maxOutOfOrderTime = 10000;//每次接受到一条数据,其实就执行了一次处理。@Overridepublic void onEvent(Tuple2 event, long eventTimestamp, WatermarkOutput output) {//更新记录窗口中的最大的 EventTimelong currentElementEventTime = event.f1;currentMaxEventTime = Math.max(currentMaxEventTime,currentElementEventTime);System.out.println("event = " + event// Event Time  事件时间+ " | " + df.format(event.f1)// Max Event Time  最大事件时间+ " | " + df.format(currentMaxEventTime)// Current Watermark  watermark时间+ " | " + df.format(currentMaxEventTime - maxOutOfOrderTime));}/*** 定期发送 watermark* @param output*/@Overridepublic void onPeriodicEmit(WatermarkOutput output) {//在这个里面指定延迟 maxOutOfOrderTime 秒钟处理。output.emitWatermark(new Watermark(currentMaxEventTime - maxOutOfOrderTime));}}/*** 指定eventTime的字段*/static class MyTimestampAssigner implements TimestampAssigner> {@Overridepublic long extractTimestamp(Tuple2 element, long recordTimestamp) {return element.f1;}}
}

执行结果日志:

event = (flink,1641756862000) | 03:34:22 | 03:34:22 | 03:34:12
event = (flink,1641756866000) | 03:34:26 | 03:34:26 | 03:34:16
event = (flink,1641756872000) | 03:34:32 | 03:34:32 | 03:34:22
event = (flink,1641756873000) | 03:34:33 | 03:34:33 | 03:34:23
event = (flink,1641756874000) | 03:34:34 | 03:34:34 | 03:34:24
==========触发窗口的分界线=========
处理时间: 17:31:46
当前窗口开始时间: 03:34:21
[(flink,1641756862000)|03:34:22]
当前窗口结束时间: 03:34:24

7.3 计算 window 的触发时间模拟

keyEvent TimecurrentMaxTimestampcurrentWatermark
00119:34:2219:34:2219:34:12
keyEvent TimecurrentMaxTimestampcurrentWatermark
00119:34:2219:34:2219:34:12
00119:34:2619:34:2619:34:16
keyEvent TimecurrentMaxTimestampcurrentWatermark
00119:34:2219:34:2219:34:12
00119:34:2619:34:2619:34:16
00119:34:3219:34:3219:34:22
keyEvent TimecurrentMaxTimestampcurrentWatermark
00119:34:2219:34:2219:34:12
00119:34:2619:34:2619:34:16
00119:34:3219:34:3219:34:22
00119:34:3319:34:3319:34:23
keyEvent TimecurrentMaxTimestampcurrentWatermarkwindow_start_timewindow_end_time
00119:34:2219:34:2219:34:12
00119:34:2619:34:2619:34:16
00119:34:3219:34:3219:34:22
00119:34:3319:34:3319:34:23
00119:34:3419:34:3419:34:24[19:34:2119:34:24)
keyEvent TimecurrentMaxTimestampcurrentWatermarkwindow_start_timewindow_end_time
00119:34:2219:34:2219:34:12
00119:34:2619:34:2619:34:16
00119:34:3219:34:3219:34:22
00119:34:3319:34:3319:34:23
00119:34:3419:34:3419:34:24[19:34:2119:34:24)
00119:34:3619:34:3619:34:26
keyEvent TimecurrentMaxTimestampcurrentWatermarkwindow_start_timewindow_end_time
00119:34:2219:34:2219:34:12
00119:34:2619:34:2619:34:16
00119:34:3219:34:3219:34:22
00119:34:3319:34:3319:34:23
00119:34:3419:34:3419:34:24[19:34:2119:34:24)
00119:34:3619:34:3619:34:26
00119:34:3719:34:3719:34:27[19:34:2419:34:27)

总结:Flink Window 触发的时间:

1、watermark 时间 >= window_end_time
2、在 [window_start_time, window_end_time) 区间中有数据存在,注意是左闭右开的区间,而且是以 event time 来计算的

7.4 WaterMark+Window 处理乱序时间

输入数据如下:

之前的数据:

flink,1641756862000
flink,1641756866000
flink,1641756872000
flink,1641756873000
flink,1641756874000
flink,1641756876000
flink,1641756877000

下面是新补的乱序数据:

flink,1641756879000
flink,1641756871000
flink,1641756883000

下面是模拟的过程

keyEvent TimecurrentMaxTimestampcurrentWatermarkwindow_start_timewindow_end_time
00119:34:2219:34:2219:34:12
00119:34:2619:34:2619:34:16
00119:34:3219:34:3219:34:22
00119:34:3319:34:3319:34:23
00119:34:3419:34:3419:34:24[19:34:2119:34:24)
00119:34:3619:34:3619:34:26
00119:34:3719:34:3719:34:27[19:34:2419:34:27)
00119:34:3919:34:3919:34:29
00119:34:3119:34:3919:34:29
keyEvent TimecurrentMaxTimestampcurrentWatermarkwindow_start_timewindow_end_time
00119:34:2219:34:2219:34:12
00119:34:2619:34:2619:34:16
00119:34:3219:34:3219:34:22
00119:34:3319:34:3319:34:23
00119:34:3419:34:3419:34:24[19:34:2119:34:24)
00119:34:3619:34:3619:34:26
00119:34:3719:34:3719:34:27[19:34:2419:34:27)
00119:34:3919:34:3919:34:29
00119:34:3119:34:3919:34:29
00119:34:4319:34:4319:34:33[19:34:3019:34:33)

八、Flink 处理太过延迟数据

下图中黄颜色的数据 10 就属于迟到太多的数据,该数据本该在 W(11) 之前执行计算

在这里插入图片描述

一般有三种处理方式:

1、延迟太多的数据,直接丢弃,这个方式是 Flink 的默认方式
2、allowedLateness 指定允许数据延迟的时间(不推荐使用)
3、sideOutputLateData 收集迟到的数据,这是大多数企业里面使用的情况,推荐使用

8.1 Flink 丢弃延迟太多的数据

默认的方式。

根据代码执行结果可知,当数据延迟太多,就会直接丢弃。

8.2 Flink 指定允许再次迟到的时间

代码

总结:

当我们设置允许迟到 2 秒的事件,第一次 window 触发的条件是 watermark >= window_end_time
第二次(或者多次)触发的条件是 watermark < window_end_time + allowedLateness

8.3 Flink 收集迟到的数据单独处理

代码

九、Flink 多并行度 Watermark

一个 window 可能会接受到多个 waterMark,我们以最小的为准。

在这里插入图片描述

代码后面放到仓库中



声明:
        文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除。感谢。转载请注明出处,感谢。


By luoyepiaoxue2014

B站: https://space.bilibili.com/1523287361 点击打开链接
微博地址: http://weibo.com/luoyepiaoxue2014 点击打开链接

相关内容

热门资讯

汽车油箱结构是什么(汽车油箱结... 本篇文章极速百科给大家谈谈汽车油箱结构是什么,以及汽车油箱结构原理图解对应的知识点,希望对各位有所帮...
美国2年期国债收益率上涨15个... 原标题:美国2年期国债收益率上涨15个基点 美国2年期国债收益率上涨15个基...
嵌入式 ADC使用手册完整版 ... 嵌入式 ADC使用手册完整版 (188977万字)💜&#...
重大消息战皇大厅开挂是真的吗... 您好:战皇大厅这款游戏可以开挂,确实是有挂的,需要了解加客服微信【8435338】很多玩家在这款游戏...
盘点十款牵手跑胡子为什么一直... 您好:牵手跑胡子这款游戏可以开挂,确实是有挂的,需要了解加客服微信【8435338】很多玩家在这款游...
senator香烟多少一盒(s... 今天给各位分享senator香烟多少一盒的知识,其中也会对sevebstars香烟进行解释,如果能碰...
终于懂了新荣耀斗牛真的有挂吗... 您好:新荣耀斗牛这款游戏可以开挂,确实是有挂的,需要了解加客服微信8435338】很多玩家在这款游戏...
盘点十款明星麻将到底有没有挂... 您好:明星麻将这款游戏可以开挂,确实是有挂的,需要了解加客服微信【5848499】很多玩家在这款游戏...
总结文章“新道游棋牌有透视挂吗... 您好:新道游棋牌这款游戏可以开挂,确实是有挂的,需要了解加客服微信【7682267】很多玩家在这款游...
终于懂了手机麻将到底有没有挂... 您好:手机麻将这款游戏可以开挂,确实是有挂的,需要了解加客服微信【8435338】很多玩家在这款游戏...