概述

flink 是分布式流、批一体化平台,提供了数据分发、通信、容错的功能,flink的批处理构建在流式处理之上,支持迭代计算、内存管理等功能

示例

依赖

  • 核心依赖:flink本身运行的时候包含的依赖,不包含连接器

对应的maven依赖

1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>

注意事项:在idea中建议将scope更改为compile,否则会导致应用运行的时候抛出NoClassDefFountError

  • 应用依赖:特定的应用程序所需要的依赖,如我们从kafka中消费对应的数据需要连接器

maven 依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.10.0</version>
</dependency>

推荐添加 Maven Shade Plugin 去构建应用,将应用程序代码及其所有需要的依赖项打包到一个 jar-with-dependencies 的 jar 包中,并且这些 第三方的依赖应该设置为compile,具体可以参考官网!

基本概念

Flink是实现了分布式集合转换的标准化平台,通过addSource来创建集合,并可以应用一系列的转换操作,最终可以将数据通过addSink的方式写入到 对应的存储系统。Flink 的source分为两种:

  • 有界的:DataSet API,对应的Env为StreamingExecutionEnvironment
  • 无界的:DataStream API,对应的Env为ExecutionEnvironment

对于上面的两种类型的数据,均是不可变的数据集合,也正是由于不可变,因此数据在并行、并发的处理的过程中才不需要考虑线程安全带来的问题,至于 因此而导致的数据量的问题则是通过data share来缩减的。(待查阅)

示例代码

batch data

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class WordCount {

public static void main(String[] args) throws Exception {

// 这里可以有多种方式来获取Environment,之所以使用getEnvironment来获取,是因为该方法会根据上下文完成正确的
// 工作,也就是说如果在idea中运行代码的话,会创建本机的环境,而如果通过命令行的方式提交到集群的话则会返回集群
// 上的执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

env.fromElements("hello world ! ni hao a ! ha ha ha !", "ni shi shui a ?")
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = s.split(" ");
for (String word : words) {
collector.collect(new Tuple2<String, Integer>(word, 1));
}
}
})// 按照tuple的第0个字段进行分组并按照第一个字段加和
.groupBy(0).aggregate(Aggregations.SUM, 1)
// 这里是sink,会自动触发execute,因此此处不需要再用execute
.print();
// env.execute("wc");
}
}

streaming data

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class StreamingWordCount {

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> windowCounts = env.socketTextStream("localhost", 9000).flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = s.split(" ");
for (String word : words) {
collector.collect(new Tuple2<String, Integer>(word, 1));
}
}
}).keyBy(0).timeWindow(Time.seconds(10)).sum(1);

windowCounts.print();

// 此处不同于batch模式
env.execute();
}
}

上面我们完成整个作业的定义之后,还需要调用environment的execute来运行即可,该方法会返回一个JobExecutionResult的结果,包含了执行耗时和累加器 的结果,如果不需要等待作业的执行结束,而仅仅是触发作业,可以使用executeAsync方法,该方法会返回一个JobClient对象,该对象可以和对应的作业进行交互。 值得注意的是只有在调用了execute方法之后,作业才真正的执行,其之前的各种操作都是lazy的,主要的作用适用于生成JobGraph(大数据的核心即在于数据不动, 程序动,这里的程序也就是我们生成的JobGraph,真正的执行过程就是将程序序列化发送到集群中的机器进行执行)。

DataStream API

Source

我们从Source中获取原始数据,我们可以通过StreamExecutionEnvironment.addSource(sourceFunction)来添加一个Source,我们通常可以实现一个SourceFunction接口 来串行的处理数据,也可以实现ParallelSourceFunction或者RichParallelSourceFunction接口来并发的处理数据流。当前已经存在一些定义好的 source可供我们使用(env#):

  • 基于文件:readFile、readTextFile &&
  • 基于socket:socketTextStream
  • 基于集合:fromCollection、fromElements、fromParallelCollection、generateSequence(用于生成序列)
  • 第三方库:可能业务中最常见的数据源就是kafka,flink也为kafka提供了一套sorce:FlinkKafkaConsumer,这里的和kafka版本是对应起来的

Transform

Sink

Sink用于消费DataStream中的数据,并将其推送到文件、socket或者打印出来,Flink提供了大量的依托于DataStream的Sink:

  • writeAsText
  • writeAsCsv
  • print
  • writeUsingOutputFormat
  • writeToSocket
  • addSink

上面的这些算子中,addSink通常可以结合Flink的checkpoint来实现exactly-once的语义来消费数据。而write*多用于调试操作

Iterators(这里不是太明白)

flink 实现了迭代式流IterativeStream,由于DataStream可能永远不会完成,因此没有最大的迭代次数。有时候我们需要使用split来指定 流的哪一部分反馈给迭代,使用filter指定哪一部分向后输出,要关闭迭代输出的话需要使用IterativeStream的closeWith(feedbackStream)方法, 这里定义的feedbackStream将会反馈给迭代头部,使用filter可以定义向下传播的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
DataStream<Long> someIntegers = env.generateSequence(0, 1000);

IterativeStream<Long> iteration = someIntegers.iterate();

DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
return value - 1 ;
}
});

DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return (value > 0);
}
});

iteration.closeWith(stillGreaterThanZero);

DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return (value <= 0);
}
});

运行时参数

水位线
容错
延时

默认情况下,流中元素的传递并不是一个接一个的传递,而是一批数据传递,这一批数据的大小是可以通过配置文件控制的,不过 这样带来的坏处就是会有延时,为了控制延时,我们通常可以使用env.setBufferTimeout(timeoutMillis)的方式,这样 当buffer中的数据还没有填满,但是已经超时的情况下,数据就会被强制flush到下游去,默认的超时时间是100ms。setBufferTimeout(-1) 可以使得缓冲区不填满就不发送,setBufferTimeout(0)则会使得数据到来之后就发出去,这样就会由批式变成流式,生产环境中应该避免这样。

调试及运行


说明: 对于上述示例,在windows上可以使用 ncat -lk 9000来打开端口,对于linux平台则可以使用 nc -lk 9000 来完成端口的开启


处理数据特点(key based)

在flink(其他分布式计算框架也同理)提供的各种转换操作通常都需要依赖于key(主要用来对数据进行分组),对于flink来说,key是有一定的要求的。(flink的数据模型不是键值对?)

Tuple

Tuple的有效下标从0开始,不过对于多级嵌套的Tuple的话,没办法选到内层的Tuple相关的字段,这种情况下只能使用字段表达式来进行解决。

字段表达式

所谓的字段表达式其实就是我们在后台中常说的POJO(通常使用PojoSerializer序列化的协议),具体规则如下:

  • 根据字段名称选择 POJO 的字段。
  • 根据字段名称或 0 开始的字段索引选择 Tuple 的字段。例如 “f0” 和 “5” 分别指 Java Tuple 类型的第一个和第六个字段(这里的f是指field)。
  • 可以选择 POJO 和 Tuple 的嵌套字段。 例如,一个 POJO 类型有一个“user”字段还是一个 POJO 类型,那么 “user.zip” 即指这个“user”字段的“zip”字段。任意嵌套和混合的 POJO 和 Tuple都是支持的,例如 “f1.user.zip” 或 “user.f3.1.zip”。
  • _可以使用 “*” 通配符表达式选择完整的类型。这也适用于非 Tuple 或 POJO 类型_???

键选择器

可以通过KeySelector来最大程度的定制化键的选择,该函数将单个上流对象作为输入,并返回按照key算子操作之后的数据。

转换函数

转换函数一般是用户的核心逻辑,当前可以采用的定义转换函数的方式分别是:

  • 实现接口
  • 匿名类
  • lambda函数
  • 富含数:对应的接口的命名规则是在接口的前面加上RichXXX,富函数为用户定义函数(map、reduce 等)额外提供了 4 个方法: open、close、getRuntimeContext 和 setRuntimeContext。这些方法有助于向函数传参(请参阅 向函数传递参数)、 创建和终止本地状态、访问广播变量(请参阅 广播变量)、访问诸如累加器和计数器等运行时信息(请参阅 累加器和计数器)和迭代信息(请参阅 迭代)。

累加器

累加器对于快速了解数据非常有用,不过当前累加器只有在job终止的时候才可以获取,因此能力有限,且应该是不能用于stream数据的记录,因为stream 是无界的,_不会存在终止,不知道是不是存在其他的方式可以使用起来,待确定!_。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.flink;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

import java.util.Arrays;

public class AccumulatorTest {

public static void main(String[] args) throws Exception {

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> line = env.fromCollection(Arrays.asList("hello world ni hao a !"));
DataSet<Tuple2<String, Integer>> res =line.map(new RichMapFunction<String, String>() {

private IntCounter counter = new IntCounter();

// 对于计数器或者累加器需要在rich对象中进行注册
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
getRuntimeContext().addAccumulator("total", this.counter);
}

@Override
public String map(String s) throws Exception {
return s;
}
}).flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = s.split(" ");
for (String word : words) {
// 统计单词的个数
getRuntimeContext().getAccumulator("total").add(1);
collector.collect(new Tuple2<String, Integer>(word, 1));
}
}
}).groupBy("f0").aggregate(Aggregations.SUM, 1);

res.writeAsText("hello.txt");

JobExecutionResult result = env.execute();

System.out.println(result.getAccumulatorResult("total"));
}
}

Stream API详解

事件时间

– watermark应该是可以理解为算子时间的

watermark是flink为了处理Eventtime窗口计算提出的一种机制,本质上也是时间。自定义的watermark会按照定制的策略生成一种系统的event,并会将 这种event发送到下游的算子。接收到watermark的算子也因此可以调节自己的EventTime Clock(这里的Event Time可以近似的认为是算子的一种特性)。 对于单流来说,WaterMark是单调递增的。

flink当前支持两种方式来产生watermark:

  • Punctuated:数据流中每一个递增的EventTime都会产生一个watermark。这种方式会产生大量的Watermark对应的Event,会对 下游的处理造成很大的压力。只有在实时性要求很高的场景才会采用这种方式生成watermark
  • Periodic:周期性(按照一定的时间间隔或者数据在达到一定的条数之后)产生一个watermark,生产环境中这种生成watermark的方式要求周期性包含时间 和数量两个纬度上来生成。因为极端情况下,比如数据来的很慢,但是时间已经过去很久了,我们其实是很有必要针对这种数据来生成watermark的,来触发窗口操作。

回过头来我们在看看Watermark机制如何解决上面的问题,上面的问题在于如何将迟来的EventTime 位11的元素正确处理。要解决这个问题我们还需要先了解一下 EventTime window是如何触发的? EventTime window 计算条件是当Window计算的Timer(在设定slide窗口之后timer会被切分成左闭右开的时间区间) 时间戳 小于等于 当前系统的Watermak的时间戳时候进行计算

下面来对比一下watermark的作用:

不使用watermark的window计算

https://blog.csdn.net/lmalds/article/details/52704170

使用watermark的window计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package com.flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;
import java.util.Iterator;

public class WaterMarkDemo {


public static void main(String[] args) throws Exception {

StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
executionEnvironment.getConfig().setAutoWatermarkInterval(50L);
executionEnvironment.setParallelism(1);
System.out.println(executionEnvironment.getParallelism());;

DataStreamSource<String> stream = executionEnvironment.socketTextStream("localhost", 9000);

SingleOutputStreamOperator<Tuple2<String, Long>> aaa = stream.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String s) throws Exception {
String[] words = s.split(",");
return new Tuple2<String, Long>(words[0], Long.parseLong(words[1]));
}
});
aaa.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {

Long currentTimestamp = Long.MIN_VALUE;

@Nullable
@Override
public Watermark getCurrentWatermark() {
// System.out.println("current timestamp is: " + currentTimestamp);
return new Watermark(currentTimestamp-3000);
}

@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
// 更新时间
currentTimestamp = stringLongTuple2.f1;
return currentTimestamp;
}
})
.keyBy(e -> e.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(1L)))
.apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
@Override
public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {

Iterator<Tuple2<String, Long>> iterator = input.iterator();
while (iterator.hasNext()){

out.collect(iterator.next().toString());
}
}
}).print();

executionEnvironment.execute();
}
}

在上述的处理过程中,先是设置了并发度为1,然后通过本地调测发现,流中触发计算的时间点卡在startTime + window+3000s的时间点, 这是由于上文设置了eventTime来处理数据,并且watermark设置了延迟的最大时间,因此这几个时间点加在一起就构成了触发的时间, ,还需要更复杂的测试示例来验证更多特性,例如使用事件事件导致数据丢点的发生,猜测原因可能是一旦事件事件到达了之后,剩下的还没有到达的事件 就会被丢弃了,因此,这种情况下应该是可以设置延迟事件来解决的,否则,一旦触发后续的点就会丢掉。 另外在测试的过程中,发现如果并发度设置为2,则结果并不会输出,原因未知。

https://yq.aliyun.com/articles/666056?spm=5176.10695662.1996646101.searchclickresult.1aa346a5kMFGqV

https://blog.csdn.net/lmalds/article/details/52704170

数据序列化

flink kafka结合

执行管理操作

广播

最近遇到一个需求,flink需要定时从数据库中读取数据加载到内存中,然后作为一份全局的配置来使用,自然就想起了广播这种模式, 之前有接触过spark,因此对于广播有过一些认知,那就是spark中的broadcast是不可变的(不过看stackoverflow上有解决办法)。 对应到flink这边,广播则是可以实现配置的动态更新。因此抽时间写了一个demo,以便后面查看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package com.flink;

import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;

import java.util.HashMap;
import java.util.Map;

public class FlinkBroadcast {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

MapStateDescriptor<Void, Map<String, String>> stateDescriptor = new MapStateDescriptor<Void, Map<String, String>>
("test", Types.VOID, Types.MAP(Types.STRING, Types.STRING));

BroadcastStream<Map<String, String>> broadcastStream = env.addSource(new RichSourceFunction<Map<String, String>>() {

private String[] name = new String[]{"wes", "wq", "wwl"};

private Integer[] age = new Integer[]{1, 2, 3};

@Override
public void run(SourceContext<Map<String, String>> sourceContext) throws Exception {

int i = 0;


while (true) {
Map<String, String> res = new HashMap<>();
Thread.sleep(5000);
res.put(name[i % 3], name[i % 3] + age[i % 3]);
i++;
sourceContext.collect(res);
}
}

@Override
public void cancel() {

}
}).broadcast(stateDescriptor);


DataStream<String> dataStream = env.addSource(new RichSourceFunction<String>() {

private String[] test = {"hello", "world", "!"};

@Override
public void run(SourceContext<String> sourceContext) throws Exception {
int i = 0;
while (true) {
sourceContext.collect(test[i % 3]);
Thread.sleep(1000);
System.out.println("i: " + i);
i++;
}
}

@Override
public void cancel() {

}
});

dataStream.connect(broadcastStream).process(new BroadcastProcessFunction<String, Map<String, String>, String>() {

Map<String, String> map = null;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
map = new HashMap<>();
map.put("start", "start num0");
System.out.println("init broadcast connection");
}

@Override
public void processElement(String s, ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {

collector.collect("stream record: " + s + ", broad value is" + map.toString());
}

@Override
public void processBroadcastElement(Map<String, String> stringStringMap, Context context, Collector<String> collector) throws Exception {
map = stringStringMap;
}
}).print();

env.execute();
}
}

对应的输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
init broadcast connection
stream record: hello, broad value is{start=start num0}
i: 0
stream record: world, broad value is{start=start num0}
i: 1
stream record: !, broad value is{start=start num0}
i: 2
stream record: hello, broad value is{start=start num0}
i: 3
stream record: world, broad value is{start=start num0}
i: 4
stream record: !, broad value is{start=start num0}
i: 5
stream record: hello, broad value is{wes=wes1}
i: 6
stream record: world, broad value is{wes=wes1}
i: 7
stream record: !, broad value is{wes=wes1}
i: 8
stream record: hello, broad value is{wes=wes1}
i: 9
stream record: world, broad value is{wes=wes1}
i: 10
stream record: !, broad value is{wq=wq2}
i: 11
stream record: hello, broad value is{wq=wq2}
i: 12
stream record: world, broad value is{wq=wq2}
i: 13
stream record: !, broad value is{wq=wq2}
....

可以看到通过这种方式可以实现广播数据的更新,一般的情况下也是可以满足我们的业务需求了。

部署

小节

native stream和mini batch有什么区别???

代码执行参考: https://riptutorial.com/apache-flink/example/27898/wordcount 未完待续。。。。