概述

在开发的过程中一直有使用spark streaming来处理监控数据,期间也遇到和解决了部分问题,为防止遗忘特别记录了一下

详解

在开始介绍之前还是先看一个spark streaming 结合 kafka的例子,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.spark;

import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;

import java.util.*;

public class SparkDemo {


public static void main(String[] args) {

SparkConf conf = new SparkConf().setAppName("wes").setMaster("local[2]");

JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));

Map<String, String> map = new HashMap<>();
map.put("bootstrap.servers", "localhost:9092");
map.put("group.id", "g1");


Set<String> topics = new HashSet<>();
topics.add("test");

JavaPairDStream<String, String> dStream = KafkaUtils.createDirectStream(jssc, String.class,
String.class, StringDecoder.class, StringDecoder.class, map, topics);

dStream.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
@Override
public Iterable<String> call(Tuple2<String, String> stringStringTuple2) throws Exception {
List<String> res = new ArrayList<>();
res.add(stringStringTuple2._2);
return res;
}
}).print();

jssc.start();
jssc.awaitTermination();
}
}

集成kafka

  • kafka 9093问题
  • offset手动提交

广播变量

广播变量的定义,如下:

1
final Broadcast<String> name = jssc.sparkContext().broadcast("hello world");

定义完之后我们就可以在程序的任意位置使用该数据了,广播变量不可以更新,因此在使用name.destroy(); 之后如果再次调用就会报如下错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
org.apache.spark.SparkException: Attempted to use Broadcast(0) after it was destroyed (destroy at SparkDemo.java:43) 
at org.apache.spark.broadcast.Broadcast.assertValid(Broadcast.scala:144)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:69)
at com.spark.SparkDemo$1.call(SparkDemo.java:44)
at com.spark.SparkDemo$1.call(SparkDemo.java:37)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:170)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:170)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:350)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
at scala.collection.AbstractIterator.to(Iterator.scala:1194)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

小结