设为首页 加入收藏

TOP

Spark分析SRS日志,以及Zookeeper和Kafka备忘录
2019-03-19 13:08:47 】 浏览:66
Tags:Spark分析 SRS日志 以及 Zookeeper Kafka 备忘录
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/winlinvip/article/details/46880385

Spark分析SRS日志

本文描述了如何使用Spark分析SRS的日志,环境搭建参考HDFSSpark,下载SRS实例日志

Cluster

HDFS和Spark的URI是hdfs://hdfs.winlin.cn:9000spark://hdfs.winlin.cn:7077,集群的配置如下:

Server OS Role CPU Mem(MB)
Namenode Ubuntu14 HDFS(Namenode)
Spark(Master)
4 768
Datanode0 Ubuntu14 HDFS(Datanode)
Spark(Slave)
4 768
Datanode1 Ubuntu14 HDFS(Datanode)
Spark(Slave)
4 768
Datanode2 Ubuntu14 HDFS(Datanode)
Spark(Slave)
4 768

HDFS

下载日志后,将文件上传到hdfs:

curl http://ossrs.net/srs.latest/objs/srs.log.zip -o srs.log.zip &&
unzip -q srs.log.zip && 
hdfs dfs -mkdir -p /srs &&
hdfs dfs -copyFromLocal srs.log /srs &&
rm -f srs.log

Spark可以通过URI读这个文件hdfs://hdfs.winlin.cn:9000/srs/srs.log

Spark

新建Scala工程SrsLogAnalysis,配置参考Spark,代码如下:

import org.apache.spark.{SparkContext, SparkConf}

object SrsLog {
  def main(args: Array[String]):Unit = {
    val conf = new SparkConf().setAppName("SrsLog")
    if (args.length > 0 && args(0) == "local") {
      conf.setMaster("spark://localhost:7077")
    } else {
      conf.setMaster("spark://hdfs.winlin.cn:7077")
    }
    val spark = new SparkContext(conf)

    val srs = spark.textFile("hdfs://hdfs.winlin.cn:9000/srs/srs.log")
    val serverStart = srs.filter(line => line.contains("srs(simple-rtmp-server)")).count()
    val ffmpegStart = srs.filter(line => line.contains("start ffmpeg")).count()
    val publishCount = srs.filter(line => line.contains("type=publish")).count()
    val playCount = srs.filter(line => line.contains("type=Play")).count()

    println("Analysis SRS log using Spark")
    println("\tserver start: " + serverStart)
    println("\tffmepg start: " + ffmpegStart)
    println("\tpublish: " + publishCount)
    println("\tplay: " + playCount)
  }
}

在一个Datanode时运行的结果:

winlin:SrsLogAnalysis winlin$ time spark-submit SrsLogAnalysis.jar 
Analysis SRS log using Spark
    server start: 7
    ffmepg start: 44754
    publish: 41035
    play: 124

real    0m15.052s
user    0m6.924s
sys 0m0.562s

TcUrls

加几行代码统计tcUrls的访问次数:

    val tcurls = srs.filter(s => s.contains("tcUrl=")).filter(s => s.contains("pageUrl"))
      .map(s => s.substring(0, s.indexOf("pageUrl=")).substring(s.indexOf("tcUrl=")))
    val vhosts: Array[(String, Int)] = tcurls.map(s => (s, 1)).reduceByKey((va, vb) => va + vb).sortBy(p => p._2, false).take(100)
    println("Top100 tcUrls:")
    vhosts.foreach((p: (String, Int)) => println("\t" + p._1 + p._2))

运行的结果如下:

winlin:SrsLogAnalysis_jar winlin$ spark-submit SrsLogAnalysis.jar 
Top100 tcUrls:
    tcUrl=rtmp://mp3:19351/livevhost=mp3, 8158
    tcUrl=rtmp://aonly:19351/livevhost=aonly, 8158
    tcUrl=rtmp://aac:19351/livevhost=aac, 8158
    tcUrl=rtmp://ts:19351/livevhost=ts, 8158
    tcUrl=rtmp://hlsmp3:19351/livevhost=hlsmp3, 8158
    tcUrl=rtmp://127.0.0.1:19351/livevhost=hls, 205
    tcUrl=rtmp://ossrs.net:19351/live...vhost...hls, 111
    tcUrl=rtmp://flv:19351/livevhost=flv, 37
    tcUrl=rtmp://ossrs.net:19351/live, 9
    tcUrl=rtmp://ossrs.net:19351/livevhost=flv, 6
    tcUrl=rtmp://ossrs.net:19351/livevhost=ts, 2
    tcUrl=rtmp://ossrs.net:19351/appkey=35c9b402c12a7246868752e2878f7e0e&vhost=bandcheck.srs.com, 1
    tcUrl=rtmp://ossrs.net:19351/live...vhost...hls/, 1

Streaming

Spark Streaming用到的nc -lk 9999命令貌似有时候不好使,可以自己写一个服务器,接收连接后就返回hello, world!的,python代码如下:

# -*- coding:utf-8 -*-
from socket import *
import time

sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET,SO_REUSEADDR, 1)

sock.bind(("0.0.0.0", 9999))
sock.listen(10)
print "server ready"

while True:
    conn,addr = sock.accept()
    print "got client: ", addr
    while True:
        conn.send("hello, world!\n")
        time.sleep(0.01)

保存为server.py,启动这个例子的命令(运行在hdfs.winlin.cn那台机器上):

python server.py

启动Spark Streaming的实例的命令:

run-example streaming.NetworkWordCount hdfs.winlin.cn 9999 --master spark://hdfs.winlin.cn:7077

结果如下:

-------------------------------------------
Time: 1437539779000 ms
-------------------------------------------
(hello,,114)
(world!,114)

-------------------------------------------
Time: 1437539780000 ms
-------------------------------------------
(hello,,76)
(world!,76)

-------------------------------------------
Time: 1437539781000 ms
-------------------------------------------
(hello,,93)
(world!,93)

Zookeeper

Spark需要用到Zookeeper,Spark Streaming一般可以用Kafka集群,Kafka也需要用到Zookeeper,因此Zookeeper对于Spark集群的Master热备是个非常重要也是常用的一种方式。

Zookeeper是一个开发库,用来协调集群的多个节点,譬如配置管理,选举Master等。Zookeeper提供开发的库,有Java和C的,所以它是一个框架而不是一个工具。

Zookeeper的配置是conf/zoo.cfg,可以拷贝conf/zoo_sample.cfg

cp conf/zoo_sample.cfg conf/zoo.cfg

详细的Zookeeper的配置参考Zookeeper管理文档,譬如改变侦听的地址可以配置clientPortAddress

配置完zoo.cfg就可以启动了:

bin/zkServer.sh start

Kafka

Kafka是一个消息集群,它的设计确实很独特,主要的系统概念是:

  • Producer:生产者,将消息发布到brokertopicpartitionbroker就是节点服务器,topic就是个分类,partition就是分类的一个分区,消息会被写入到一个分区(复制到其他broker)。
  • Broker:节点服务器,负责处理消息。producerconsumer都是它的客户端,Kafka就是个CS模型。broker可以通过zookeeper进行集群管理。
  • Consumer: 消费者,从broker读取producer发布的消息。同时还有ConsumerGroup,一个消息会同时发布到所有的ConsumerGroup;但是topic某个partition的消息永远只会发布每个ConsumerGroup的某个特定的Consumer;这样就可以支持queuepub-sub消息模型。

Queue模型:即一个topic的消息,会被多个consumer读取,每个consumer读取的消息都不一样,主要是用于负载均衡的消息处理。Kafka实现这个模型的方法是:

  • 只有一个ConsumerGroup:所有消息都会发布到这个唯一的ConsumerGroup。
  • 这个ConsumerGroup有多个Consumer:每个Consumer都在处理消息,ConsumerGroup内部是负载均衡的。

Pub-Sub模型:即Publisher-Subscriber模型,发布者-订阅者模型,即一个消息被多个consumer处理。Kafka实现这个模型的方法是:

  • 有多个ConsumerGroup:所有消息都会发给所有的ConsumerGroup一份。
  • 每个ConsumerGroup只有一个Consumer:所有消息都被这个Consumer处理了一次。

理解了Kafka的几个关键概念,就容易理解它提到的各种结构了。

Kafka集群

Kafka集群官方配置参考:https://kafka.apache.org/documentation.html#quickstart

配置中有几个是必须修改:

  • broker.id,必须全局唯一,是broker的id标识,zookeeper选举也是用的这个id。
  • port,侦听的端口,如果一台机器安装多个broker,就需要修改端口。
  • log.dir,日志目录,如果一台机器安装多个broker,就需要修改这个目录。

Consumer的启动是一样的,因为Consumer连接的是Zookeeper。

对于Kafka集群,有几点是不一样的:

  • –replication–factor,复制因子,这个决定了消息复制到哪些broker。
  • topic按照--replication-factor分配broker,一旦分配了就不会改变了,除非手动修改。
  • 集群中的其他broker可以提供metadata,说明Leader和Follower是哪台,客户端会连接到这个broker。
  • Producer连接的--broker-list,可以是集群中的任意broker,可以只连接一台。

也就是说,topic会在集群中选择replication-factor指定数目的broker,选举出Leader和Follower。Producer必须(或者通过其他broker获取topic的集群)将消息发送到topic所在的broker。

查看Topic信息时,可以看到详细内容:

winlin:~ winlin$ ~/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic srs
Topic:srs   PartitionCount:1    ReplicationFactor:2 Configs:
    Topic: srs  Partition: 0    Leader: 9094    Replicas: 9094,9095 Isr: 9094,9095

这个信息说明了:

  • PartitionCount,只有一个分区。
  • ReplicationFactor,复制因子是2,也就是信息会复制到2台broker上去。
  • Partition:下面是每个分区一条信息,显示了这个分区的信息。
  • Leader:说明这个分区的leader Broker是9094.
  • Replicas:消息会复制到9094和9095上面。
  • Isr:正在复制的broker,也就是topic的集群有哪些broker在工作。

同时,如果Producer连接到9093也是可以的,这台broker挂掉了就没法发送消息了:

tailf ~/srs/objs/srs.log| ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9093 --topic srs

或者指定两台broker,错误时切下一台,这样可以有热备:

tailf ~/srs/objs/srs.log| ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9093,localhost:9094 --topic srs

注意:譬如上面的topic分配的是broker是9094和9095,就算指定多台broker,topic的broker也必须上线才能发送消息,也就是说,Producer可以连接到9093但是它只是将Leader信息返回给了Producer。

Kafka集群会给分区做负载均衡,分配到不同的broker,如果创建多个分区,每个分区都会做均衡:

winlin:~ winlin$ ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic srs --partitions 5 --replication-factor 2
Created topic "srs".
winlin:~ winlin$ ~/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic srs
Topic:srs   PartitionCount:5    ReplicationFactor:2 Configs:
    Topic: srs  Partition: 0    Leader: 9094    Replicas: 9094,9093 Isr: 9094,9093
    Topic: srs  Partition: 1    Leader: 9095    Replicas: 9095,9094 Isr: 9095,9094
    Topic: srs  Partition: 2    Leader: 9093    Replicas: 9093,9095 Isr: 9093,9095
    Topic: srs  Partition: 3    Leader: 9094    Replicas: 9094,9095 Isr: 9094,9095
    Topic: srs  Partition: 4    Leader: 9095    Replicas: 9095,9093 Isr: 9095,9093

Kafka Partition

Kafka一个topic可以有多个Partition即分区,这样做可以支持负载均衡和扩展,譬如某个topic的消息特别多,可以多建几个分区,不同的分区会分配到不同的broker,这样在写入时写到不同的分区,就写到了不同的Leader。

这个对于弹性系统很有用,譬如系统某topic前期只有10台broker,后来这个topic并发消息很多,broker增加到30台,这样就增加partition指定到新的broker了。

每个Partition的消息都会发给ConsumerGroup的某个Consumer,这样可以实现多个ConsumerGroup同时处理这条消息,也就是topic的所有消息都会分配给每个ConsumerGroup处理。

ConsumerGroup会选择一个Consumer处理消息,这样实现了负载均衡,可以多个Consumer处理一个ConsumerGroup的消息。

显然一个消息不会重复发送个多个Partition,这会导致若只有一个Consumer会重复收到这一个消息(每个Partition的消息都会给这个唯一的Consumer)。

Producer发送消息的机制是:

  • 选择broker。
  • 选择topic的partition。
  • 将消息写入partition。
  • 将这个partition的消息复制给其他的broker。
  • 将消息发送给consumer group中的某个consumer。

这样热备和负载均衡都支持了。其中ConsumerGroup的Consumer数目,不应该比Partition的数目多,因为如果多了,就有些Consumer拿不到数据了。

备注:默认kafka-console-consumer.sh启动时ConsumerGroup是随机的,所以启动的Consumer总能收到所有的消息。可以启动时指定配置文件,配置文件中指定ConsumerGroup,命令如下:

~/kafka/bin/kafka-console-consumer.sh \
    --zookeeper localhost:2181 --topic srs \
    --consumer.config ~/kafka/config/consumer.properties 

新建一个topic,有2个Partitions,2个Replication:

winlin:~ winlin$ ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic srs --partitions 2 --replication-factor 2
Created topic "srs".
winlin:~ winlin$ ~/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic srs
Topic:srs   PartitionCount:2    ReplicationFactor:2 Configs:
    Topic: srs  Partition: 0    Leader: 9095    Replicas: 9095,9093 Isr: 9095,9093
    Topic: srs  Partition: 1    Leader: 9093    Replicas: 9093,9094 Isr: 9093,9094

启动三个Producer,向两个Partition写入数据:

~/srs/research/librtmp/objs/srs_rtmp_dump -r rtmp://127.0.0.1:1935/live/livestream >l0.log &
~/srs/research/librtmp/objs/srs_rtmp_dump -r rtmp://127.0.0.1:1935/live/livestream >l1.log &
~/srs/research/librtmp/objs/srs_rtmp_dump -r rtmp://127.0.0.1:1935/live/livestream >l2.log &
tailf l0.log| ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9093 --topic srs &
tailf l1.log| ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9093 --topic srs &
tailf l2.log| ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9093 --topic srs &

新开三个Console,分别启动三个Consumer,属于同一个ConsumerGroup,可以看到只有两个Consumer拿到了数据:

~/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic srs --consumer.config ~/kafka/config/consumer.properties
~/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic srs --consumer.config ~/kafka/config/consumer.properties
~/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic srs --consumer.config ~/kafka/config/consumer.properties

可见实例的console-producer是随机写入Partition的。每个Partion的消息都会发给ConsumerGroup的某个Consumer。

Kafka Producer

Kafka自带的Console Producer本机使用没有问题,但是在Internet上用有问题。错误信息如下:

[2015-08-27 10:33:07,356] WARN Failed to send producer request with correlation id 5 to broker 9092 with data for partitions [winlin,0] (kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
    at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
    at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
    at scala.collection.immutable.Stream.foreach(Stream.scala:547)
    at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
    at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

Kafka自带的Console Consumer也有问题,错误信息如下:

[2015-08-27 10:35:54,450] WARN Fetching topic metadata with correlation id 26 for topics [Set(winlin)] from broker [id:9093,host:BrokerServer,port:9092] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

原因是Kafka的主机名配置是从系统获取的,有时候更改了没有获取到。可以在配置文件中配置:

# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured.  Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=<hostname routable by clients>

# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients>

貌似是依照hostname来定位服务器的,而不仅仅是--broker-list

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇spark sql基础与示例 下一篇Spark 2.1.0的运行模式

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目