TOP

kafka+spark streaming代码实例(pyspark+python)
2018-11-29 11:18:30 】 浏览:769
Tags:kafka spark streaming 代码 实例 pyspark python

一、系统准备

1.启动zookeeper:bin/zkServer.cmd start

2.启动kafka:bin/kafka-server-start.sh -daemon config/server.properties

3.启动spark:sbin/start-all.sh

数据来源:http://files.grouplens.org/datasets/movielens/ml-100k.zip

流程:kafka读取user数据集并生产数据流——spark streaming 计算每个职业人数——计算结果存入MySQL

二、kafka读取user数据集并生产数据流,1秒生产1条记录。

先创建topic:

bin/kafka-topics.sh--create --zookeeper 192.168.26.247:2181--replication-factor2--partitions1--topic txt

验证topic:bin/kafka-topics.sh--list --zookeeper 192.168.26.247:2181

bin/kafka-topics.sh--describe --zookeeper192.168.26.247:2181--topic txt


from kafka import KafkaProducer  
from kafka import KafkaConsumer  
from kafka.errors import KafkaError  
import time  
def main():  
    ##生产模块  
    producer = KafkaProducer(bootstrap_servers=['192.168.26.247:9092'])  
    with open('/home/hadoop/ml-100k/u.user','r') as f:  
        for line in f.readlines():  
            time.sleep(1)  
            producer.send("txt",line)  
            print line  
           #producer.flush()  
  
if __name__ == '__main__':  
    main()  
保存txt.py运行结果如下:


spark streaming 消费并计算数据,并将结果存入数据库

from pyspark import SparkContext  
from pyspark import SparkConf  
from pyspark.streaming import StreamingContext  
from pyspark.streaming.kafka import KafkaUtils,TopicAndPartition  
import MySQLdb  
def start():  
    sconf=SparkConf()  
    sconf.set('spark.cores.max',3)  
    sc=SparkContext(appName='txt',conf=sconf)  
    ssc=StreamingContext(sc,5)  
    brokers ="192.168.26.247:9092,192.168.26.246:9092"  
    topic='txt'  
    start = 70000  
    partition=0  
    user_data = KafkaUtils.createDirectStream(ssc,[topic],kafkaParams={"metadata.broker.list":brokers})  
    #fromOffsets 设置从起始偏移量消费  
    #user_data = KafkaUtils.createDirectStream(ssc,[topic],kafkaParams={"metadata.broker.list":brokers},fromOffsets={TopicAndPartition(topic,partition):long(start)})  
    user_fields = user_data.map(lambda line: line[1].split('|'))  
    gender_users = user_fields.map(lambda fields: fields[3]).map(lambda gender: (gender,1)).reduceByKey(lambda a,b: a+b)  
    user_data.foreachRDD(offset)#存储offset信息  
    gender_users.pprint()  
    gender_users.foreachRDD(lambda rdd: rdd.foreach(echo))#返回元组    
    ssc.start()  
    ssc.awaitTermination()  
offsetRanges = []  
def offset(rdd):  
    global offsetRanges  
    offsetRanges = rdd.offsetRanges()  
def echo(rdd):  
    zhiye = rdd[0]  
    num = rdd[1]  
    for o in offsetRanges:  
        topic = o.topic   
        partition = o.partition  
        fromoffset = o.fromOffset  
        untiloffset = o.untilOffset  
    #结果插入MySQL  
    conn = MySQLdb.connect(user="root",passwd="******",host="192.168.26.245",db="test",charset="utf8")  
    cursor = conn.cursor()  
    sql = "insert into zhiye(id,zhiye,num,topic,partitions,fromoffset,untiloffset) \  
                       values (NULL,'%s','%d','%s','%d','%d','%d')" % (zhiye,num,topic,partition,fromoffset,untiloffset)  
    cursor.execute(sql)  
    conn.commit()  
    conn.close()   
   
if __name__ == '__main__':  
    start()  
三、向集群submit

bin/spark-submit --master spark://192.168.26.245:7077 --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.0.2.jar python/txt.py

运行结果



数据库部分数据:



WEB显示数据:



kafka+spark streaming代码实例(pyspark+python) https://www.cppentry.com/bencandy.php?fid=120&id=191431

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇RocketMQ与kafka对比(18项差异).. 下一篇kafka的go版本api使用