设为首页 加入收藏

TOP

spark streaming读取kafka示例
2018-11-13 15:20:27 】 浏览:213
Tags:spark streaming 读取 kafka 示例
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/xzj9581/article/details/79223826

spark streaming读取kafka示例,其中

Spark Streaming优雅的关闭策略优化部分参考:
http://qindongliang.iteye.com/blog/2404100

如何管理Spark Streaming消费Kafka的偏移量部分参考:
http://qindongliang.iteye.com/blog/2401194

Spark向kafka中写入数据部分参考:

http://blog.csdn.net/bsf5521/article/details/76635867locationNum=9&fps=1


object Demo {

val LOGGER = org.apache.log4j.LogManager.getLogger(Demo.getClass)
val PROP = conf
lazy val kafkaProducer = KafkaSink[String, String](KafkaHelper.getProducerConfigs())

def main(args: Array[String]): Unit = {
//创建StreamingContext
val ssc = createStreamingContext()

//开始执行
ssc.start()

//启动接受停止请求的守护进程:http://192.168.1.XX:3443/close/Demo
daemonHttpServer(3443, ssc) //通过Http方式优雅的关闭策略

//等待任务终止
ssc.awaitTermination()
}

/**
* *
* 创建StreamingContext
* @return
*/
def createStreamingContext(): StreamingContext = {
conf

val ssc = StreamingUtils.getStreamingContext(Demo.getClass.getSimpleName)
val spark = SparkSqlUtils.createSpark(this.getClass.getSimpleName());
var offsetRanges: Array[OffsetRange] = null

// Create direct kafka stream with brokers and topics
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](KafkaHelper.getTopic(), KafkaHelper.getConsumerConfigs(), KafkaHelper.readOffset(Demo.getClass.getSimpleName)))

val windowsmessage = messages.transform(rdd => {
// 创建一个新的离散流(DStream),取得kafka数据offset,准备手动提交offset
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
})
.map(record => {
try {
// 解析JSON数据
val value = record.value()
// record.topic()
if (value == null) {
// 无效数据设为空 TODO
null
} else {
value
}
} catch {
case e: Throwable => {
LOGGER.error("解析kafka数据错误。" + record, e)
e.printStackTrace()
throw e
}
}
})
// 过滤无效数据
.filter(r => {
// 过滤为空的数据
if (null == r) {
false
} else {
true
}
})
// 聚合等操作
//.groupByKey() 根据需要决定是否聚合
.foreachRDD(rdd => {
// RDD转化为dataframe
// 构造schema
// val schema = StructType(structFileds)
// val dataframe = sparkSession.createDataFrame(rowRDD, schema)
// dataframe.createOrReplaceTempView("tablename");
// 关联其它表进行计算
var json = spark.read.json(rdd)
json.createOrReplaceTempView("json")
spark.sql("select unix_timestamp() from json")
val result = spark.sql("select unix_timestamp() a")
// driver
result.foreachPartition(itRow=>{
// worker
itRow.foreach(f=>{
// worker-sub
val fut = kafkaProducer.send("topic", "key", f.getAs[String]("a"))
fut.get// 同步方式
})
})
try {
// 提交kafka offset
KafkaHelper.saveOffset(Demo.getClass.getSimpleName, offsetRanges);
} catch {
case e: Throwable => {
LOGGER.error("commitAsync错误。" + offsetRanges, e)
offsetRanges.foreach { x => LOGGER.error("offsetRanges:" + x.toString()) }
throw e
}
}
})
ssc
}

/**
*
* 负责启动守护的jetty服务
* @param port 对外暴露的端口号
* @param ssc Stream上下文
*/
def daemonHttpServer(port: Int, ssc: StreamingContext) = {
val server = new Server(port)
val context = new ContextHandler();
context.setContextPath("/close/" + Demo.getClass.getSimpleName.replace("$", ""));
context.setHandler(new CloseStreamHandler(ssc))
server.setHandler(context)
server.start()
}

/**
* 负责接受http请求来优雅的关闭流
* @param ssc Stream上下文
*/
class CloseStreamHandler(ssc: StreamingContext) extends AbstractHandler {
override def handle(s: String, baseRequest: Request, req: HttpServletRequest, response: HttpServletResponse): Unit = {
LOGGER.warn("开始关闭......")
ssc.stop(true, true) //优雅的关闭
response.setContentType("text/html; charset=utf-8");
response.setStatus(HttpServletResponse.SC_OK);
val out = response.getWriter();
out.println("close success");
baseRequest.setHandled(true);
LOGGER.warn("关闭成功.....")
}
}

def conf() = {
val props = new Properties()
//文件要放到resource文件夹下
val is = getClass.getResourceAsStream("/config.properties")
props.load(new BufferedInputStream(is))
props.putAll(System.getProperties);
System.setProperties(props)
props
}

}

object StreamingUtils {
def getStreamingContext(name: String): StreamingContext = {
val sparkConf = new SparkConf() //创建SparkConf对象
val osName = System.getProperty("os.name");
if (osName != null && osName.startsWith("Windows")) {
System.setProperty("user.name", "root");
sparkConf.set("spark.executor.memory", "512M")
sparkConf.set("spark.executor.cores", "1")
sparkConf.setMaster("local[2]")
}
sparkConf.setAppName(this.getClass.getSimpleName)
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//sparkConf.set("spark.sql.shuffle.partitions", "6")
sparkConf.set("spark.streaming.stopGracefullyOnShutdown","true")//优雅的关闭
sparkConf.set("spark.streaming.backpressure.enabled","true")//激活削峰功能
sparkConf.set("spark.streaming.backpressure.initialRate","5000")//第一次读取的最大数据值
sparkConf.set("spark.streaming.kafka.maxRatePerPartition","2000")//每个进程每秒最多从kafka读取的数据条数
val ssc = new StreamingContext(sparkConf, Seconds(Integer.parseInt(System.getProperty("streaming.batchduration"))))
ssc
}
}


object SerialiableUtils {
//对象序列化为字符串
def objectSerialiable(obj: Object): String = {
val byteArrayOutputStream = new ByteArrayOutputStream()
val objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)
objectOutputStream.writeObject(obj)
var serISO = byteArrayOutputStream.toString("ISO-8859-1")
var serUTF8 = java.net.URLEncoder.encode(serISO, "UTF-8")
objectOutputStream.close()
byteArrayOutputStream.close()
serUTF8;
}

//字符串反序列化为对象
def objectDeserialization[T](serStr: String): T = {
val redStr = java.net.URLDecoder.decode(serStr, "UTF-8")
val byteArrayInputStream = new ByteArrayInputStream(redStr.getBytes("ISO-8859-1"))
val objectInputStream = new ObjectInputStream(byteArrayInputStream)
val newObj = objectInputStream.readObject()
objectInputStream.close()
byteArrayInputStream.close()
newObj.asInstanceOf[T]
}
}

@SerialVersionUID(1L)
class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
/* This is the key idea that allows us to work around running into
NotSerializableExceptions. */
lazy val producer = createProducer()
val callback = new Callback() {
def onCompletion(metadata: RecordMetadata, e: Exception) {
if (e != null) {
println("kafka send Callback:" + metadata.toString())
e.printStackTrace()
}
}
}

def send(topic: String, key: K, value: V): Future[RecordMetadata] =
//producer.send(new ProducerRecord[K, V](topic, key, value))
producer.send(new ProducerRecord[K, V](topic, key, value), callback)

def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))
}

object KafkaSink {
import scala.collection.JavaConversions._

def apply[K, V](config: Map[String, String]): KafkaSink[K, V] = {
val createProducerFunc = () => {
val producer = new KafkaProducer[K, V](config)
sys.addShutdownHook {
// Ensure that, on executor JVM shutdown, the Kafka producer sends
// any buffered messages to Kafka before shutting down.
producer.close()
}
producer
}
new KafkaSink(createProducerFunc)
}

def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
}

object KafkaHelper {
lazy val log = org.apache.log4j.LogManager.getLogger(KafkaHelper.getClass)
lazy val zkClient = ZkUtils.createZkClient(System.getProperty("zookeeper.url"), Integer.MAX_VALUE, 10000)
lazy val zkUtils = ZkUtils.apply(zkClient, true)

val OFFSET_ROOT = "/spark/streaming/offset/"

/**
* *读取zk里面的偏移量,如果有就返回对应的分区和偏移量
* 如果没有就返回None
* @param zkClient zk连接的client
* @param zkOffsetPath 偏移量路径
* @param topic topic名字
* @return 偏移量Map or None
*/
private def readOffsets(zkOffsetPath: String, topic: String): Option[Map[TopicPartition, Long]] = {
//(偏移量字符串,zk元数据)
val (offsetsRangesStrOpt, _) = zkUtils.readDataMaybeNull(zkOffsetPath) //从zk上读取偏移量
offsetsRangesStrOpt match {
case Some(null) =>
None //如果是null,就返回None
case Some(offsetsRangesStr) =>
//这个topic在zk里面最新的分区数量
val lastest_partitions = zkUtils.getPartitionsForTopics(Seq(topic)).get(topic).get
var offsets = offsetsRangesStr.split(",") //按逗号split成数组
.map(s => s.split(":")) //按冒号拆分每个分区和偏移量
.map { case Array(partitionStr, offsetStr) => (new TopicPartition(topic, partitionStr.toInt) -> offsetStr.toLong) } //加工成最终的格式
.toMap //返回一个Map
//说明有分区扩展了
if (offsets.size < lastest_partitions.size) {
//得到旧的所有分区序号
val old_partitions = offsets.keys.map(p => p.partition).toArray
//通过做差集得出来多的分区数量数组
val add_partitions = lastest_partitions.diff(old_partitions)
if (add_partitions.size > 0) {
log.warn("发现kafka新增分区:" + add_partitions.mkString(","))
add_partitions.foreach(partitionId => {
offsets += (new TopicPartition(topic, partitionId) -> 0)
log.warn("新增分区id:" + partitionId + "添加完毕....")
})
}
} else {
log.info("没有发现新增的kafka分区:" + lastest_partitions.mkString(","))
}
Some(offsets) //将Map返回
case None =>
None //如果是null,就返回None
}
}

/**
* **
* 保存每个批次的rdd的offset到zk中
* @param zkClient zk连接的client
* @param zkOffsetPath 偏移量路径
* @param rdd 每个批次的rdd
*/
private def saveOffsets(zkOffsetPath: String, offsetRanges: Array[OffsetRange]): Unit = {
//转换rdd为Array[OffsetRange]
//val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//转换每个OffsetRange为存储到zk时的字符串格式 : 分区序号1:偏移量1,分区序号2:偏移量2,......
val offsetsRangesStr = offsetRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.untilOffset}").mkString(",")
log.debug(" 保存的偏移量: " + offsetsRangesStr)
//将最终的字符串结果保存到zk里面
zkUtils.updatePersistentPath(zkOffsetPath, offsetsRangesStr)
}

private def getZKPath(name: String): String = {
OFFSET_ROOT + name.replace("$", "")
}

def readOffset(name: String) = {
val path = getZKPath(name)
val e = zkClient.exists(path);
if (!e) {
zkClient.createPersistent(path, true);
}
val offset = readOffsets(path, System.getProperty("kafka.source.topics"))

if (offset.isEmpty) {
ju.Collections.emptyMap[TopicPartition, jl.Long]()
} else {
val offsetJavaMap = new ju.HashMap[TopicPartition, java.lang.Long]()
offset.get.foreach(f => {
offsetJavaMap.put(f._1, f._2)
})
offsetJavaMap
}
}

def saveOffset(name: String, offsetRanges: Array[OffsetRange]): Unit = {
saveOffsets(getZKPath(name), offsetRanges)
}

def getConsumerConfigs(): ju.HashMap[String, Object] = {
val kafkaParams = new ju.HashMap[String, Object]
kafkaParams.put("bootstrap.servers", System.getProperty("kafka.brokers"))
kafkaParams.put("group.id", System.getProperty("kafka.source.group.id"))
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
kafkaParams.putAll(strToJMap(System.getProperty("kafka.consumer.configs", ""), ",", "="))
kafkaParams
}

def getProducerConfigs(): Properties = {
val kafkaProps = new Properties()
kafkaProps.put("bootstrap.servers", System.getProperty("kafka.brokers"))
kafkaProps.put("group.id", System.getProperty("kafka.source.group.id"))
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("retries", "1000"); // 请求失败重试的次数
// 默认发送不进行压缩,推荐配置一种适合的压缩算法,可以大幅度的减缓网络压力和Broker的存储压力。正确的选项值是none、gzip、snappy.压缩最好用于批量处理,批量处理消息越多,压缩性能越好
kafkaProps.put("compression.type", "snappy");
// 默认情况即使缓冲区有剩余的空间,也会立即发送请求,
// 设置一段时间用来等待从而将缓冲区填的更多,单位为毫秒,
// producer发送数据会延迟1ms,可以减少发送到kafka服务器的请求数据
kafkaProps.put("linger.ms", "1");
//这里是只要求leader响应就OK,更高的要求则应该设置成"all
kafkaProps.put("acks", "1");
// kafka可以在一个connection中发送多个请求,叫作一个flight,这样可以减少开销,但是如果产生错误,可能会造成数据的发送顺序改变,默认是5
kafkaProps.put("max.in.flight.requests.per.connection", "5")
kafkaProps.putAll(strToJMap(System.getProperty("kafka.producer.configs", ""), ",", "="))
kafkaProps
}

def getTopic(): ArrayList[String] = {
val topics = System.getProperty("kafka.source.topics")
var topicsList = new ArrayList[String]()
topics.split(",").foreach { t =>
topicsList.add(t.trim)
}
topicsList
}

/**
* Configeration string convert to java.util.Map.<br>
* if in is empty, then retrun null
* @parma: in: "a=1,b=2,c=3";
* @parma: spliterPairs: ","
* @parma: spliterKV: "="
* @return: java.util.Map("a"->"1","b"->"2","c"->"3")
*/
def strToJMap(in: String, spliterPairs: String, spliterKV: String): java.util.Map[String, String] = {
if (in == null) {
return null
}
var map = new java.util.HashMap[String, String]()
in.split(spliterPairs).foreach { pairs =>
val arr = pairs.split(spliterKV)
if (arr.length == 2) {
map.put(arr(0).trim(), arr(1).trim())
}
}
map
}
}


】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spark踩坑记——Spark Streaming+.. 下一篇Spark算子:RDD行动Action操作(3)..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目