1, spark streaming: tcp 源
maven依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
程序:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object wc {
def main(args: Array[String]): Unit = {
//配置:spark
val conf = new SparkConf().setMaster("local[3]").setAppName("wc")
//流的上下文
val ssc = new StreamingContext(conf,Seconds(2))
//获取输入源
val dstream1 = ssc.socketTextStream("localhost",9999)
val dstream2 = dstream1.map((_, 1)).reduceByKey(_+_)
//开启上下文
dstream2.print()
ssc.start()
ssc.awaitTermination()
}
}
2, spark streaming : kafka数据源
1.5.2版本
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.5.2</version>
</dependency>
object wc_kafka {
def updateFunc () = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
iterator.flatMap {
case (x, y, z) =>
Some(y.sum + z.getOrElse(0)).map(n => (x, n))
}
}
def main(args: Array[String]): Unit = {
//创建SparkConf并设置AppName
val Array(zkQuorum, groupId, topics, numThreads, hdfs) = Array(
"localhost:2181", "g1", "t1,topoic1", "1", "check-point2")
val conf = new SparkConf().setAppName("UrlCount").setMaster("local[3]")
//创建StreamingContext
val ssc = new StreamingContext(conf, Seconds(2))
ssc.checkpoint(hdfs)//设置检查点
//从Kafka中拉取数据
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap //设置topic信息
val lines = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
val wc = lines.flatMap(_.split(" ")).map((_, 1))
val result = wc.updateStateByKey(updateFunc _, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
// 将结果打印到控制台
result.print()
ssc.start()
ssc.awaitTermination()}
}
}
2.1.0版本
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.0</version>
</dependency>
//代码
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, LocationStrategies, KafkaUtils}
object wc_kafka {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("kafka")
conf.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(2))
//kafka参数
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "g1",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//创建streaming输入源
val topics = Array("t1")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
//打印结果
val ds2 = stream.map(record => (record.key, record.value))
ds2.print()
//启动
ssc.start()
ssc.awaitTermination()
}
}