设为首页 加入收藏

TOP

kafka安装与使用远程代码进行访问     --附踩坑记录
2019-02-11 14:31:23 】 浏览:19
Tags:kafka 安装 使用 远程 代码 进行 访问   记录
版权声明:转接请说明,毕竟纯手打 https://blog.csdn.net/apologizetm/article/details/82713177

转载请注明出处、毕竟纯手打 谢谢


# kafka安装和使用java连接远程服务器进行消息的生成与消费

### 首先要使用kafka,要有jdk和zookeeper的环境
### 本文在阿里云的centos7环境上进行
### jdk版本选择的是1.8.0_181
### zookeeper的版本是3.4.12
### kafka的版本是2.12-1.1.1
### 关于kafka命令的介绍 本文不介绍了 只介绍怎么搭建一个kafka单点服务器 以及怎么使用代码 远程连接kafka服务器


## 下载地址
kafka下载地址 :http://kafka.apache.org/downloads
zookeeper下载地址:https://zookeeper.apache.org/
jdk下载地址:https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html


##操作步骤
1、首先 使用tar命令对jdk进行解压
tar -zxvf tar -zxvf jdk-8u181-linux-x64.tar.gz
目录下面会多出一个jdk1.8.0_181 进入里面去 使用pwd命令查看绝对路径 并且复制这个路径
最后进行jdk环境变量的配置
编辑 vim /etc/profile文件
在文件后面加上:
export JAVA_HOME=(刚才pwd命令看到的路径)
export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/ lib/ tools.jar
export PATH=$PATH:${JAVA_HOME}/bin

最后使用source /etc/profile 刷新文件

使用java -version 查看环境变量是否配置成功

2、成功之后进行zookeeper的安装

使用 tar -zxvf zookeeper-3.4.12.tar.gz 接下下载好的zookeeper安装包

将zookeeper下的/conf/zookeeper.example改名成zoo.cfg
使用mv 和cp命令都可以 然后vim这个文件 加上下面两行
dataLogDir=/tmp/zookeeper-log #日志路径
quorumListenOnAllIPs=true #在阿里云的服务器上保证外网可以访问到 刚开始没设置这个折腾了好久
3、最后,安装kafka
使用 tar -zxvf kafka_2.12-1.1.1.tgz 解压下载好的kafka
cd 到解压后的文件里面去 编辑配置文件 vim config/server.properties
加上下面几行
listeners=PLAINTEXT://:9092
advertised.host.name=阿里云服务器公网ip #
advertised.port=9092

将zookeeper.connect的值改为阿里云的公网ip

#### 至此,所有的环境的安装已经完成,下面使用kafka的命令进行消息的生成和消费
首先cd到zookeeper的bin目录下 使用 ./zkServer.sh start 启动zookeeper
再cd到kafka的bin目录下 使用 ./kafka-server-start.sh ../config/server.properties 启动kafka

新建一个会话或者打开一个新的终端
这时候使用jps命令 可以看到 Kafka和QuorumPeerMain表示启动全部成功,下面创建一个主题
cd到kafka的bin目录下面,执行
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 -- partitions 1 --topic Hello-world

输出Created topic "Hello-world". 表示topic创建成功
使用./kafka-topics.sh --list --zookeeper localhost:2181 查看主题的列表
输出里面会含有Hello-world

下面进行消息的生产和消费
先启动生产者 ./kafka-console-producer.sh --broker-list 阿里云公网ip:9092 --topic Hello- world
会出现一个 > 类似于交互界面 这时候就可以生产消息了

启动消费者 ./kafka-console-consumer.sh --zookeeper 阿里云公网ip:2181 --topic Hello- world --from-beginning

这时候当生产者生产消息的时候 消费者这边就可以看到了


### 在服务器上面进行消息的生产和消费就完成了 下面介绍怎么使用java代码进行远程连接kafka服务器
### 这个地方真的踩了好多好多坑、有次晚上下班搞到了快两点 百度、谷歌、维基、Stack Overflow 能找解决问题的地方都找了浪费了好多不必要的时间


首先、新建一个Maven工程(此处不再多描述),在pom文件中加入kafka的依赖

        <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.10</artifactId>
      <version>0.8.2.0</version>
    </dependency>



新建一个KafkaProducerDemo和KafkaConsumerDemo类(名字可以自定义):
话不多说 上代码

KafkaProducerDemo类:

public class KafkaProducerDemo {
public static void main(String[] args) {
//创建properties文件
Properties properties = new Properties();
//设置kafka服务器地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "阿里云公网ip:9092");
//设置key进行序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//设置value进行序列化

public class KafkaProducerDemo {
  public static void main(String[] args) {
    //创建properties文件
     Properties properties = new Properties();
   //设置kafka服务器地址
   properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "阿里云公网ip:9092");
   //设置key进行序列化
   properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
   //设置value进行序列化
   properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
   //创建消息生产者
   KafkaProducer<String,String> producer = new KafkaProducer<>(properties);
   //创建消息实体 制定主题、key、value
    ProducerRecord<String,String> record = new ProducerRecord<>("Hello-world","haha","from java client");
   //发送消息
    producer.send(record);
   System.out.println("消息发送成功");
   //关闭生产者
    producer.close();
    
 }
}




KafkaConsumerDemo类:

public class KafkaConsumerDemo {

    public static void main(String[] args) {
        //新建配置文件
        Properties properties = new Properties();
        //设置kafka服务器地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"47.106.218.182:9092");
        //设置key的反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        //设置value的反序列化
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        //设置groupid
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        //创建消费者对象
        KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);
        //订阅主题
        consumer.subscribe(Arrays.asList("Hello-world"));

        while (true) {
            //消费消息
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)

                System.out.println("消息的主题是:" + record.topic()+",消息的key是:" + record.key()+",消息的value是:"+record.value());
        }
    }
}






上面就是连接kafka远程服务器代码


**但是上述过程做完之后还是不能正确运行、这个地方折腾了好久、最后在哪里看到解决的办法记不大清了
就是要阿里云服务器服务安全设置里面加个规则 将2181和9092端口开放就可以,但是我中间也使用命令的方式
关闭了防火墙、没什么用,不知道什么鬼。 搞得我头皮发麻**








编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Kafka's Metadata In ZooKeep.. 下一篇Error while fetching metadata w..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }