import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.ArrayList;
import java.util.Properties;
public class KafkaComsumerFirst {
private final static String TOPIC = "first";
public static void main(String[] args) {
Properties prop = new Properties();
prop.put("bootstrap.servers", "192.168.122.132:9092");
prop.put("group.id", "group-1");
prop.put("enable.auto.commit", "true");
prop.put("auto.commit.interval.ms", "1000");
//auto.offset.reset=earliest 表示从头开始消费,latest消费最新的
prop.put("auto.offset.reset", "latest");
//prop.put("session.timeout.ms", "30000");
//prop.put("partition.assignment.strategy", "range");
prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
TopicPartition partition = new TopicPartition(TOPIC, 0);
ArrayList<String> list = new ArrayList<String>();
list.add(TOPIC);
consumer.subscribe(list);
//list.add(partition);
//consumer.assign(list);
//consumer.subscribe(partition);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("offset:" + record.offset());
System.out.println("value:" + record.value());
}
}
}
}
出现的问题:
在linux内使用Shell可以成功发送消息创建Topic。
但是在外部使用API无法发送消息。
具体原因:
Hostname and port the broker will advertise to producers and consumers. If not set, it uses the value for “listeners” if configured. Otherwise, it will use the value returned from java.net.InetAddress.getCanonicalHostName().
解决方法:
conf/service.properties中
#advertised.listeners=PLAINTEXT://your.host.name:9092
修改为
advertised.listeners=PLAINTEXT://192.168.84.136:9092 (为虚拟机的ip)