TOP

kafka java客户端调用问题
2019-01-06 02:25:28 】 浏览:1113
Tags:kafka java 客户端 调用 问题

1、所示,客户端包版本得跟卡夫卡版本对应、否则报错如下

15:45:04.132 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Completed connection to node -1.  Fetching API versions.
15:45:04.133 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Initiating API versions fetch from node -1.
SLF4J: Failed toString() invocation on an object of type [org.apache.kafka.clients.NodeApiVersions]
java.lang.NullPointerException
	at org.apache.kafka.clients.NodeApiVersions.apiVersionToText(NodeApiVersions.java:167)
	at org.apache.kafka.clients.NodeApiVersions.toString(NodeApiVersions.java:134)
	at org.apache.kafka.clients.NodeApiVersions.toString(NodeApiVersions.java:120)
	at org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:304)
	at org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:276)
	at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:230)
	at ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:298)
	at ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:208)
	at ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:212)
	at ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:103)
	at ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:88)
	at ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:48)
	at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:273)
	at ch.qos.logback.classic.Logger.callAppenders(Logger.java:260)
	at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:442)
	at ch.qos.logback.classic.Logger.filterAndLog_2(Logger.java:433)
	at ch.qos.logback.classic.Logger.debug(Logger.java:511)
	at org.apache.kafka.clients.NetworkClient.handleApiVersionsResponse(NetworkClient.java:558)
	at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:538)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:359)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:225)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:126)
	at java.lang.Thread.run(Thread.java:748)

2、非本地调用:server.properties文件中advertised.host.name =服务器IP地址advertised.port = 9092 listeners = PLAINTEXT://0.0.0.0:9092

新版本listeners=PLAINTEXT://服务器内网地址:9092

advertised.listeners=PLAINTEXT://服务器公网地址:9092

配置不对报错如下:

15:52:27.651 [kafka-producer-network-thread | producer-1] DEBUG o.a.kafka.common.network.Selector - [Producer clientId=producer-1] Connection with /172.17.92.57 disconnected
java.net.ConnectException: Connection timed out: no further information
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_131]
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[na:1.8.0_131]
	at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50) ~[kafka-clients-2.0.0.jar:na]
	at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:152) ~[kafka-clients-2.0.0.jar:na]
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:471) [kafka-clients-2.0.0.jar:na]
	at org.apache.kafka.common.network.Selector.poll(Selector.java:425) [kafka-clients-2.0.0.jar:na]
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) [kafka-clients-2.0.0.jar:na]
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) [kafka-clients-2.0.0.jar:na]
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) [kafka-clients-2.0.0.jar:na]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]

3、后台运行kafka 添加-daemon参数

./kafka-server-start.sh -daemon ../config/server.properties

java的代码消费端

包com.cn.handle;


import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;


import org.apache.kafka.clients.producer.KafkaProducer;


import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

公共类KafkaConsumer {

私人最终ConsumerConnector消费者;

private KafkaConsumer(){
Properties props = new Properties();

// zookeeper配置
props.put(“zookeeper.connect”,“localhost:2181”);

//消费者所在组
props.put(“group.id”,“test-consumer-group”);

// zk连接超时
props.put(“zookeeper.session.timeout.ms”,“4000”);
props.put(“zookeeper.sync.time.ms”,“200”);
props.put(“auto.commit.interval.ms”,“1000”);
props.put(“auto.offset.reset”,“最小”);

//序列化类
props.put(“serializer.class”,“kafka.serializer.StringEncoder”);

ConsumerConfig config = new ConsumerConfig(props);

消费者= kafka.consumer.Consumer。 createJavaConsumerConnector(配置);
}

void consume(){
Map <String,Integer> topicCountMap = new HashMap <String,Integer>();
topicCountMap.put(“zzbtest”,new Integer(1));

StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

Map <String,List <KafkaStream <String,String >>> consumerMap =
consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
KafkaStream <String,String> stream = consumerMap.get(“zzbtest”)。get(0);
ConsumerIterator <String,String> it = stream.iterator();

int messageCount = 0;
while(it.hasNext()){
的System.out.println(it.next()消息());
messageCount ++;
if(messageCount == 100){
System.out.println(“Consumer端一共消费了”+ messageCount +“条消息!”);
}
}
的System.out.println( “结束”);
}

公共静态无效的主要(字串[] args){
新KafkaConsumer()消耗();
}
}

消息发送端:

package com.cn.handle;


import java.io.IOException;
import java.util.Properties;


import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;


/**
*kafka 消息发送
*
*/
public class SendDatatToKafka {
public static void main(String[] args) {
SendDatatToKafka s = new SendDatatToKafka();
try {
s.send("zzbtest", "jack", "rose");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}


public void send(String topic, String key, String data) throws IOException {
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
// key.serializer.class默认为serializer.class
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);


Producer<String, String> producer = new Producer<String, String>(config);
for (int i = 0; i < 1000; i++) {
try {
Thread.sleep(100);
} catch(InterruptedException e){
// TODO自动生成的catch块
e.printStackTrace();
}
producer.send(new KeyedMessage <String,String>(topic,key,data + i));


}


producer.close();
}
}


kafka java客户端调用问题 https://www.cppentry.com/bencandy.php?fid=120&id=202705

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Kafka与Logstash的数据采集对接 .. 下一篇kafka-connect遇到的问题