概述 flink 是分布式流、批一体化平台,提供了数据分发、通信、容错的功能,flink的批处理构建在流式处理之上,支持迭代计算、内存管理等功能
示例 依赖
核心依赖:flink本身运行的时候包含的依赖,不包含连接器
对应的maven依赖
1 2 3 4 5 6 7 8 9 10 11 12 <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-java</artifactId > <version > 1.10.0</version > <scope > provided</scope > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-streaming-java_2.11</artifactId > <version > 1.10.0</version > <scope > provided</scope > </dependency >
注意事项:在idea中建议将scope更改为compile,否则会导致应用运行的时候抛出NoClassDefFountError
应用依赖:特定的应用程序所需要的依赖,如我们从kafka中消费对应的数据需要连接器
maven 依赖:
1 2 3 4 5 <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-connector-kafka-0.10_2.11</artifactId > <version > 1.10.0</version > </dependency >
推荐添加 Maven Shade Plugin 去构建应用,将应用程序代码及其所有需要的依赖项打包到一个 jar-with-dependencies 的 jar 包中 ,并且这些
第三方的依赖应该设置为compile,具体可以参考官网!
基本概念 Flink是实现了分布式集合转换的标准化平台,通过addSource来创建集合,并可以应用一系列的转换操作,最终可以将数据通过addSink的方式写入到
对应的存储系统 。Flink 的source分为两种:
有界的:DataSet API,对应的Env为StreamingExecutionEnvironment
无界的:DataStream API,对应的Env为ExecutionEnvironment
对于上面的两种类型的数据,均是不可变的数据集合,也正是由于不可变,因此数据在并行、并发的处理的过程中才不需要考虑线程安全带来的问题,至于
因此而导致的数据量的问题则是通过data share来缩减的。(待查阅)
示例代码 batch data 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.aggregation.Aggregations;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.util.Collector;public class WordCount { public static void main (String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.fromElements("hello world ! ni hao a ! ha ha ha !" , "ni shi shui a ?" ) .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap (String s, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] words = s.split(" " ); for (String word : words) { collector.collect(new Tuple2<String, Integer>(word, 1 )); } } }) .groupBy(0 ).aggregate(Aggregations.SUM, 1 ) .print(); } }
streaming data 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;public class StreamingWordCount { public static void main (String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> windowCounts = env.socketTextStream("localhost" , 9000 ).flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap (String s, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] words = s.split(" " ); for (String word : words) { collector.collect(new Tuple2<String, Integer>(word, 1 )); } } }).keyBy(0 ).timeWindow(Time.seconds(10 )).sum(1 ); windowCounts.print(); env.execute(); } }
上面我们完成整个作业的定义之后,还需要调用environment的execute来运行即可,该方法会返回一个JobExecutionResult的结果,包含了执行耗时和累加器
的结果,如果不需要等待作业的执行结束,而仅仅是触发作业,可以使用executeAsync方法,该方法会返回一个JobClient对象,该对象可以和对应的作业进行交互。
值得注意的是只有在调用了execute方法之后,作业才真正的执行,其之前的各种操作都是lazy的,主要的作用适用于生成JobGraph(大数据的核心即在于数据不动,
程序动,这里的程序也就是我们生成的JobGraph,真正的执行过程就是将程序序列化发送到集群中的机器进行执行)。
DataStream API Source 我们从Source中获取原始数据,我们可以通过StreamExecutionEnvironment.addSource(sourceFunction)来添加一个Source,我们通常可以实现一个SourceFunction接口
来串行的处理数据,也可以实现ParallelSourceFunction或者RichParallelSourceFunction接口来并发的处理数据流。当前已经存在一些定义好的
source可供我们使用(env#):
基于文件:readFile、readTextFile &&
基于socket:socketTextStream
基于集合:fromCollection、fromElements、fromParallelCollection、generateSequence(用于生成序列)
第三方库:可能业务中最常见的数据源就是kafka,flink也为kafka提供了一套sorce:FlinkKafkaConsumer,这里的 和kafka版本是对应起来的
Sink Sink用于消费DataStream中的数据,并将其推送到文件、socket或者打印出来,Flink提供了大量的依托于DataStream的Sink:
writeAsText
writeAsCsv
print
writeUsingOutputFormat
writeToSocket
addSink
上面的这些算子中,addSink通常可以结合Flink的checkpoint来实现exactly-once的语义来消费数据。而write*多用于调试操作
Iterators(这里不是太明白) flink 实现了迭代式流IterativeStream,由于DataStream可能永远不会完成,因此没有最大的迭代次数。有时候我们需要使用split来指定
流的哪一部分反馈给迭代,使用filter指定哪一部分向后输出,要关闭迭代输出的话需要使用IterativeStream的closeWith(feedbackStream)方法,
这里定义的feedbackStream将会反馈给迭代头部,使用filter可以定义向下传播的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 DataStream<Long> someIntegers = env.generateSequence(0 , 1000 ); IterativeStream<Long> iteration = someIntegers.iterate(); DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() { @Override public Long map (Long value) throws Exception { return value - 1 ; } }); DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() { @Override public boolean filter (Long value) throws Exception { return (value > 0 ); } }); iteration.closeWith(stillGreaterThanZero); DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() { @Override public boolean filter (Long value) throws Exception { return (value <= 0 ); } });
运行时参数 水位线 容错 延时 默认情况下,流中元素的传递并不是一个接一个的传递,而是一批数据传递,这一批数据的大小是可以通过配置文件控制的,不过
这样带来的坏处就是会有延时,为了控制延时,我们通常可以使用env.setBufferTimeout(timeoutMillis)的方式,这样
当buffer中的数据还没有填满,但是已经超时的情况下,数据就会被强制flush到下游去,默认的超时时间是100ms。setBufferTimeout(-1)
可以使得缓冲区不填满就不发送,setBufferTimeout(0)则会使得数据到来之后就发出去,这样就会由批式变成流式,生产环境中应该避免这样。
调试及运行
说明:
对于上述示例,在windows上可以使用 ncat -lk 9000来打开端口,对于linux平台则可以使用 nc -lk 9000 来完成端口的开启
处理数据特点(key based) 在flink(其他分布式计算框架也同理)提供的各种转换操作通常都需要依赖于key(主要用来对数据进行分组),对于flink来说,key是有一定的要求的。(flink的数据模型不是键值对?)
Tuple Tuple的有效下标从0开始,不过对于多级嵌套的Tuple的话,没办法选到内层的Tuple相关的字段,这种情况下只能使用字段表达式来进行解决。
字段表达式 所谓的字段表达式其实就是我们在后台中常说的POJO(通常使用PojoSerializer序列化的协议),具体规则如下:
根据字段名称选择 POJO 的字段。
根据字段名称或 0 开始的字段索引选择 Tuple 的字段。例如 “f0” 和 “5” 分别指 Java Tuple 类型的第一个和第六个字段(这里的f是指field)。
可以选择 POJO 和 Tuple 的嵌套字段。 例如,一个 POJO 类型有一个“user”字段还是一个 POJO 类型,那么 “user.zip” 即指这个“user”字段的“zip”字段。任意嵌套和混合的 POJO 和 Tuple都是支持的 ,例如 “f1.user.zip” 或 “user.f3.1.zip”。
_可以使用 “*” 通配符表达式选择完整的类型。这也适用于非 Tuple 或 POJO 类型_???
键选择器 可以通过KeySelector来最大程度的定制化键的选择,该函数将单个上流对象作为输入,并返回按照key算子操作之后的数据。
转换函数 转换函数一般是用户的核心逻辑,当前可以采用的定义转换函数的方式分别是:
实现接口
匿名类
lambda函数
富含数 :对应的接口的命名规则是在接口的前面加上RichXXX,富函数为用户定义函数(map、reduce 等)额外提供了 4 个方法: open、close、getRuntimeContext 和 setRuntimeContext。这些方法有助于向函数传参(请参阅 向函数传递参数)、 创建和终止本地状态、访问广播变量(请参阅 广播变量)、访问诸如累加器和计数器等运行时信息(请参阅 累加器和计数器)和迭代信息(请参阅 迭代)。
累加器 累加器对于快速了解数据非常有用,不过当前累加器只有在job终止的时候才可以获取,因此能力有限,且应该是不能用于stream数据的记录,因为stream
是无界的,_不会存在终止,不知道是不是存在其他的方式可以使用起来,待确定!_。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 package com.flink;import org.apache.flink.api.common.JobExecutionResult;import org.apache.flink.api.common.accumulators.IntCounter;import org.apache.flink.api.common.functions.RichFlatMapFunction;import org.apache.flink.api.common.functions.RichMapFunction;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.aggregation.Aggregations;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.util.Collector;import java.util.Arrays;public class AccumulatorTest { public static void main (String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> line = env.fromCollection(Arrays.asList("hello world ni hao a !" )); DataSet<Tuple2<String, Integer>> res =line.map(new RichMapFunction<String, String>() { private IntCounter counter = new IntCounter(); @Override public void open (Configuration parameters) throws Exception { super .open(parameters); getRuntimeContext().addAccumulator("total" , this .counter); } @Override public String map (String s) throws Exception { return s; } }).flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap (String s, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] words = s.split(" " ); for (String word : words) { getRuntimeContext().getAccumulator("total" ).add(1 ); collector.collect(new Tuple2<String, Integer>(word, 1 )); } } }).groupBy("f0" ).aggregate(Aggregations.SUM, 1 ); res.writeAsText("hello.txt" ); JobExecutionResult result = env.execute(); System.out.println(result.getAccumulatorResult("total" )); } }
Stream API详解 事件时间 – watermark应该是可以理解为算子时间的
watermark是flink为了处理Eventtime窗口计算提出的一种机制,本质上也是时间。自定义的watermark会按照定制的策略生成一种系统的event,并会将
这种event发送到下游的算子。接收到watermark的算子也因此可以调节自己的EventTime Clock(这里的Event Time可以近似的认为是算子的一种特性)。
对于单流来说,WaterMark是单调递增的。
flink当前支持两种方式来产生watermark:
Punctuated:数据流中每一个递增的EventTime都会产生一个watermark。这种方式会产生大量的Watermark对应的Event,会对
下游的处理造成很大的压力。只有在实时性要求很高的场景才会采用这种方式生成watermark
Periodic:周期性(按照一定的时间间隔或者数据在达到一定的条数之后)产生一个watermark,生产环境中这种生成watermark的方式要求周期性包含时间
和数量两个纬度上来生成。因为极端情况下,比如数据来的很慢,但是时间已经过去很久了,我们其实是很有必要针对这种数据来生成watermark的,来触发窗口操作。
回过头来我们在看看Watermark机制如何解决上面的问题,上面的问题在于如何将迟来的EventTime 位11的元素正确处理。要解决这个问题我们还需要先了解一下
EventTime window是如何触发的? EventTime window 计算条件是当Window计算的Timer(在设定slide窗口之后timer会被切分成左闭右开的时间区间)
时间戳 小于等于 当前系统的Watermak的时间戳时候进行计算
下面来对比一下watermark的作用:
不使用watermark的window计算 https://blog.csdn.net/lmalds/article/details/52704170
使用watermark的window计算 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 package com.flink;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;import org.apache.flink.streaming.api.functions.windowing.WindowFunction;import org.apache.flink.streaming.api.watermark.Watermark;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.util.Collector;import javax.annotation.Nullable;import java.util.Iterator;public class WaterMarkDemo { public static void main (String[] args) throws Exception { StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); executionEnvironment.getConfig().setAutoWatermarkInterval(50L ); executionEnvironment.setParallelism(1 ); System.out.println(executionEnvironment.getParallelism());; DataStreamSource<String> stream = executionEnvironment.socketTextStream("localhost" , 9000 ); SingleOutputStreamOperator<Tuple2<String, Long>> aaa = stream.map(new MapFunction<String, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map (String s) throws Exception { String[] words = s.split("," ); return new Tuple2<String, Long>(words[0 ], Long.parseLong(words[1 ])); } }); aaa.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() { Long currentTimestamp = Long.MIN_VALUE; @Nullable @Override public Watermark getCurrentWatermark () { return new Watermark(currentTimestamp-3000 ); } @Override public long extractTimestamp (Tuple2<String, Long> stringLongTuple2, long l) { currentTimestamp = stringLongTuple2.f1; return currentTimestamp; } }) .keyBy(e -> e.f0) .window(TumblingEventTimeWindows.of(Time.seconds(1L ))) .apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() { @Override public void apply (String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception { Iterator<Tuple2<String, Long>> iterator = input.iterator(); while (iterator.hasNext()){ out.collect(iterator.next().toString()); } } }).print(); executionEnvironment.execute(); } }
在上述的处理过程中,先是设置了并发度为1,然后通过本地调测发现,流中触发计算的时间点卡在startTime + window+3000s的时间点,
这是由于上文设置了eventTime来处理数据,并且watermark设置了延迟的最大时间,因此这几个时间点加在一起就构成了触发的时间,
,还需要更复杂的测试示例来验证更多特性 ,例如使用事件事件导致数据丢点的发生,猜测原因可能是一旦事件事件到达了之后,剩下的还没有到达的事件
就会被丢弃了,因此,这种情况下应该是可以设置延迟事件来解决的,否则,一旦触发后续的点就会丢掉。
另外在测试的过程中,发现如果并发度设置为2,则结果并不会输出,原因未知。
https://yq.aliyun.com/articles/666056?spm=5176.10695662.1996646101.searchclickresult.1aa346a5kMFGqV
https://blog.csdn.net/lmalds/article/details/52704170
数据序列化 flink kafka结合
执行管理操作 广播 最近遇到一个需求,flink需要定时从数据库中读取数据加载到内存中,然后作为一份全局的配置来使用,自然就想起了广播这种模式,
之前有接触过spark,因此对于广播有过一些认知,那就是spark中的broadcast是不可变的(不过看stackoverflow上有解决办法)。
对应到flink这边,广播则是可以实现配置的动态更新。因此抽时间写了一个demo,以便后面查看。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 package com.flink;import org.apache.flink.api.common.state.MapStateDescriptor;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.BroadcastStream;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.apache.flink.util.Collector;import java.util.HashMap;import java.util.Map;public class FlinkBroadcast { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1 ); MapStateDescriptor<Void, Map<String, String>> stateDescriptor = new MapStateDescriptor<Void, Map<String, String>> ("test" , Types.VOID, Types.MAP(Types.STRING, Types.STRING)); BroadcastStream<Map<String, String>> broadcastStream = env.addSource(new RichSourceFunction<Map<String, String>>() { private String[] name = new String[]{"wes" , "wq" , "wwl" }; private Integer[] age = new Integer[]{1 , 2 , 3 }; @Override public void run (SourceContext<Map<String, String>> sourceContext) throws Exception { int i = 0 ; while (true ) { Map<String, String> res = new HashMap<>(); Thread.sleep(5000 ); res.put(name[i % 3 ], name[i % 3 ] + age[i % 3 ]); i++; sourceContext.collect(res); } } @Override public void cancel () { } }).broadcast(stateDescriptor); DataStream<String> dataStream = env.addSource(new RichSourceFunction<String>() { private String[] test = {"hello" , "world" , "!" }; @Override public void run (SourceContext<String> sourceContext) throws Exception { int i = 0 ; while (true ) { sourceContext.collect(test[i % 3 ]); Thread.sleep(1000 ); System.out.println("i: " + i); i++; } } @Override public void cancel () { } }); dataStream.connect(broadcastStream).process(new BroadcastProcessFunction<String, Map<String, String>, String>() { Map<String, String> map = null ; @Override public void open (Configuration parameters) throws Exception { super .open(parameters); map = new HashMap<>(); map.put("start" , "start num0" ); System.out.println("init broadcast connection" ); } @Override public void processElement (String s, ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception { collector.collect("stream record: " + s + ", broad value is" + map.toString()); } @Override public void processBroadcastElement (Map<String, String> stringStringMap, Context context, Collector<String> collector) throws Exception { map = stringStringMap; } }).print(); env.execute(); } }
对应的输出如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 init broadcast connection stream record: hello, broad value is{start=start num0} i: 0 stream record: world, broad value is{start=start num0} i: 1 stream record: !, broad value is{start=start num0} i: 2 stream record: hello, broad value is{start=start num0} i: 3 stream record: world, broad value is{start=start num0} i: 4 stream record: !, broad value is{start=start num0} i: 5 stream record: hello, broad value is{wes=wes1} i: 6 stream record: world, broad value is{wes=wes1} i: 7 stream record: !, broad value is{wes=wes1} i: 8 stream record: hello, broad value is{wes=wes1} i: 9 stream record: world, broad value is{wes=wes1} i: 10 stream record: !, broad value is{wq=wq2} i: 11 stream record: hello, broad value is{wq=wq2} i: 12 stream record: world, broad value is{wq=wq2} i: 13 stream record: !, broad value is{wq=wq2} ....
可以看到通过这种方式可以实现广播数据的更新,一般的情况下也是可以满足我们的业务需求了。
部署 小节 native stream和mini batch有什么区别???
代码执行参考: https://riptutorial.com/apache-flink/example/27898/wordcount
未完待续。。。。