val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
// 可以通过ssc.sparkContext 来访问SparkContext
// 或者通过已经存在的SparkContext来创建StreamingContext
val sc = ...
val ssc = new StreamingContext(sc, Seconds(1))
-------------------------------------------
Time: 1504665716000 ms
-------------------------------------------
-------------------------------------------
Time: 1504665717000 ms
-------------------------------------------
-------------------------------------------
Time: 1504665718000 ms
-------------------------------------------
(227.7202-1,2)
(created,2)
(offer,8)
(BUSINESS,11)
(agree,10)
(hereunder,,1)
(“control”,1)
(Grant,2)
(2.2.,2)
(include,11)
...
-------------------------------------------
Time: 1504665719000 ms
-------------------------------------------
Time: 1504665739000 ms
-------------------------------------------
-------------------------------------------
Time: 1504665740000 ms
-------------------------------------------
(under,1)
(Technology,1)
(distribution,2)
(http://hadoop.apache.org/core/,1)
(Unrestricted,1)
(740.13),1)
(check,1)
(have,1)
(policies,1)
(uses,1)
...
-------------------------------------------
Time: 1504665741000 ms
-------------------------------------------
RDD队列
package streaming
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object QueueRdd {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("QueueRdd")
val ssc = new StreamingContext(conf, Seconds(1))
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
//创建RDD队列
val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]()
// Create the QueueInputDStream and use it do some processing
// 创建QueueInputDStream
val inputStream = ssc.queueStream(rddQueue)
//处理队列中的RDD数据
val mappedStream = inputStream.map(x => (x % 10, 1))
val reducedStream = mappedStream.reduceByKey(_ + _)
//打印结果
reducedStream.print()
//启动计算
ssc.start()
// Create and push some RDDs into
for (i <- 1 to 30) {
rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
Thread.sleep(2000)
//通过程序停止StreamingContext的运行
//ssc.stop()
}
}
}
(2)、高级数据源
自定义一个Receiver采集器
package streaming
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver
import scala.collection.mutable
class customReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){
//程序启动的时候调用
override def onStart(): Unit = {
val socket = new Socket(host,port)
var inputText= ""
var reader = new BufferedReader(new InputStreamReader(socket.getInputStream(),StandardCharsets.UTF_8))
inputText = reader.readLine()
while(!isStopped() && inputText!=null) {
//接受到数据,保存
store(inputText)
inputText = reader.readLine()
}
restart("")
}
//程序停止调用
override def onStop(): Unit = {
}
}
object customReceiver{
def main(args: Array[String]): Unit = {
//创建配置
val sc = new SparkConf().setAppName("sparkStreaming").setMaster("local[*]")
//创建streamingContext
val ssc = new StreamingContext(sc,Seconds(5))
//从socket接收数据
val lineDStream = ssc.receiverStream(new customReceiver("linux01",5456))
val wordDStream = lineDStream.flatMap(_.split(" "))
val word2CountDStream = wordDStream.map((_,1))
val resultDStream = word2CountDStream.reduceByKey(_ + _)
resultDStream.print()
//启动ssc
ssc.start()
ssc.awaitTermination()
}
}