设为首页 加入收藏

TOP

KafkaStreams-Java-API操作-06
2019-05-14 14:20:19 】 浏览:11
Tags:KafkaStreams-Java-API 操作 -06

Kafka Streams

Kafka Streams。Apache Kafka开源项目的一个组成部分。是一个功能强大,易于使用的库。用于在Kafka上构建高可分布式、拓展性,容错的应用程序。

Kafka Streams特点

1)功能强大

高扩展性,弹性,容错

2)轻量级

无需专门的集群

一个库,而不是框架

3)完全集成

100%的Kafka 0.10.0版本兼容

易于集成到现有的应用程序

4)实时性

毫秒级延迟

并非微批处理

窗口允许乱序数据

允许迟到数据

为什么要有Kafka Stream

当前已经有非常多的流式处理系统,最知名且应用最多的开源流式处理系统有Spark Streaming和Apache Storm。Apache Storm发展多年,应用广泛,提供记录级别的处理能力,当前也支持SQL on Stream。而Spark Streaming基于Apache Spark,可以非常方便与图计算,SQL处理等集成,功能强大,对于熟悉其它Spark应用开发的用户而言使用门槛低。另外,目前主流的Hadoop发行版,如Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。

既然Apache Spark与Apache Storm拥用如此多的优势,那为何还需要Kafka Stream呢?笔者认为主要有如下原因。

第一,Spark和Storm都是流式处理框架,而Kafka Stream提供的是一个基于Kafka的流式处理类库。框架要求开发者按照特定的方式去开发逻辑部分,供框架调用。开发者很难了解框架的具体运行方式,从而使得调试成本高,并且使用受限。而Kafka Stream作为流式处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,方便使用和调试。

第二,虽然Cloudera与Hortonworks方便了Storm和Spark的部署,但是这些框架的部署仍然相对复杂。而Kafka Stream作为类库,可以非常方便的嵌入应用程序中,它对应用的打包和部署基本没有任何要求。

第三,就流式处理系统而言,基本都支持Kafka作为数据源。例如Storm具有专门的kafka-spout,而Spark也提供专门的spark-streaming-kafka模块。事实上,Kafka基本上是主流的流式处理系统的标准数据源。换言之,大部分流式系统中都已部署了Kafka,此时使用Kafka Stream的成本非常低。

第四,使用StormSpark Streaming时,需要为框架本身的进程预留资源,如Storm的supervisor和Spark on YARN的node manager。即使对于应用实例而言,框架本身也会占用部分资源,如Spark Streaming需要为shuffle和storage预留内存。但是Kafka作为类库不占用系统资源。

第五,由于Kafka本身提供数据持久化,因此Kafka Stream提供滚动部署和滚动升级以及重新计算的能力。

第六,由于Kafka Consumer Rebalance机制,Kafka Stream可以在线动态调整并行度

案例需求:

实时处理单词带有”>>>”前缀的内容。例如输入”atguigu=>>>ximenqing”,最终处理成“ximenqing”

发送数据到first第一个topic,然后写程序对数据进行清洗,再发送到第二个topic,second,然后再进行消费.

导入依赖

<dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>0.11.0.0</version>
        </dependency>


    </dependencies>

主程序

package com.buba.kafka.stream;

import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;

public class Application {

    public static void main(String[] args) {

        // 定义输入的topic
        String from = "first";
        // 定义输出的topic
        String to = "second";

        // 设置参数
        Properties settings = new Properties();
        //应用名称,随便起
        settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
        //连接kafka主机名
        settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop-senior01.buba.com:9092");

        StreamsConfig config = new StreamsConfig(settings);

        // 构建拓扑
        TopologyBuilder builder = new TopologyBuilder();

        //addSource:参数1 给数据源起个名称,参数2 topic的名称
        builder.addSource("SOURCE", "first")
                //addProcessor:参数1 给加工过程起个名称,参数2具体的实现过程,参数3.上一级的名称,也就是上面的source
                .addProcessor("PROCESS", new ProcessorSupplier<byte[], byte[]>() {

                    @Override
                    public Processor<byte[], byte[]> get() {
                        // 具体分析处理
                        return new LogProcessor();
                    }
                }, "SOURCE")
                //addSink:参数1 输出名称 ,参数2 输出到下一个topic名称 ,参数3 上一级的名称,也就是处理过程名称.
                .addSink("SINK", "second", "PROCESS");

        // 创建kafka stream
        KafkaStreams streams = new KafkaStreams(builder, config);

        // 开启流
        streams.start();
    }
}

数据清洗业务逻辑

package com.buba.kafka.stream;

import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;

public class LogProcessor implements Processor<byte[], byte[]> {
	
	private ProcessorContext context;

	// 初始化
	@Override
	public void init(ProcessorContext context) {
		this.context = context;
	}

	// 业务逻辑处理
	@Override
	public void process(byte[] key, byte[] value) {
		String input = new String(value);
		
		// 如果包含“>>>”则只保留该标记后面的内容
		if (input.contains(">>>")) {
			input = input.split(">>>")[1].trim();
			// 输出到下一个topic
			context.forward(key, input.getBytes());
		}else{
			context.forward(key, value);
		}
	}

	@Override
	public void punctuate(long timestamp) {
		// 时间戳处理
	}

	@Override
	public void close() {
		// 关闭资源
	}
}

启动主程序,然后在1节点上开启first的发送者.发送消息进行测试.

bin/kafka-console-producer.sh --broker-list hadoop-senior01.buba.com:9092 --topic first

2节点上开启second的消费者

bin/kafka-console-consumer.sh --zookeeper hadoop-senior02.buba.com:2181 --from-beginning --topic second

打成jar包在linux上进行测试.

在pom文件里添加上这段,修改main的主函数类名

<build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>com.buba.kafka.stream.Application</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>assembly</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

打好包后上传到linux上,直接运行就可以了,然后再进行相应的消息测试.


编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Kafka史上最详细原理总结   .. 下一篇Error while fetching metadata w..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }