设为首页 加入收藏

TOP

Window下kafka 单机SASL_PLAINTEXT加密及身份认证
2019-05-02 02:32:56 】 浏览:81
Tags:Window kafka 单机 SASL_PLAINTEXT 加密 身份认证
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u014088839/article/details/84582779

一:前情提要

SASL_PLAINTEXT是一种简单的用户名和密码认证机制,是一种kafka加密协议,PLAINTEXT是传输层协议

二:配置准备

1:JAVA_HOME

有JAVA_HOME的环境变量,且java版为1.8及以上,jdk目录无中文和空格;

2:KAFKA项目部署

下载解压KAFKA项目到一个无中文无空格的目录下。配置非加密下的KAFKA环境,参考Windows环境下kafka搭建

三:JAAS配置

KAFKA使用JAVA认证和授权服务(JAAS)进行SASL配置

1:为zookeeper配置JAAS

在config目录下创建文件kafka_zoo_jaas.conf,文件内容具体如下:

Server {
	org.apache.kafka.common.security.plain.PlainLoginModule required
	username="admin"
	password="admin"
	user_admin="admin";
};

创建了一个Server节点,其中

  • org.apache.kafka.common.security.plain.PlainLoginModule required是加密方式,用plain, 连接时必须身份验证。
  • username,password是zookeeper之间通讯的用户名和密码,
  • user_admin="admin"的结构是user_username="password",用户名是admin, 密码是admin,客户端连接到zookeeper时,使用这个用户名和密码。

2:为broker配置JAAS

在config目录下创建文件kafka_server_jaas.conf,具体内容如下:

Client {
	org.apache.kafka.common.security.plain.PlainLoginModule required
	username="admin"
	password="admin";
};

KafkaServer {
	org.apache.kafka.common.security.plain.PlainLoginModule required
	username="admin"
	password="admin"
	user_admin="admin"
	user_alice="alice";
};
  • Client节点,是broker连接zookeeper时的认证信息
  • KafkaServer节点:集群中,broker之间用节点中的username,password进行通讯
  • KafkaServer节点:客户端(producer,consumer)连接broker时用user_username="password"结构中的账号密码登录

3:为客户端(producer,consumer等)配置JAAS

在config目录下创建文件kafka_client_jaas.conf,文件内容具体如下:客户端用client节点信息,连接认证zookeeper后broker

Client {
	org.apache.kafka.common.security.plain.PlainLoginModule required
	username="admin"
	password="admin";
};

四、SASL配置

4.1:zookeeper的sasl配置

4.1.1:修改zookeeper.properties 修改后的文件内容为:主要就是新增后面三行

#数据文件目录
dataDir=./data/zookeeper
# 客户端连接时的端口
clientPort=2181

maxClientCnxns=0
#加密认证
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

4.1.2:修改zookeeper-server-start.bat

在文件中加入:这个目的是将kafka_zoo_jaas.conf将入zookeeper的jvm参数中

set KAFKA_OPTS=-Djava.security.auth.login.config=E:/demo/kafka/SASL_PLAINTEXT/broker/2181/kafka_2.12-1.1.1/config/kafka_zoo_jaas.conf

加入后,完整的zookeeper-server-start.bat信息如下

@echo off

IF [%1] EQU [] (
	echo USAGE: %0 zookeeper.properties
	EXIT /B 1
)
SetLocal

IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
    set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%~dp0../../config/log4j.properties
)
IF ["%KAFKA_HEAP_OPTS%"] EQU [""] (
    set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
)
set KAFKA_OPTS=-Djava.security.auth.login.config=E:/demo/kafka/SASL_PLAINTEXT/broker/2181/kafka_2.12-1.1.1/config/kafka_zoo_jaas.conf

"%~dp0kafka-run-class.bat" org.apache.zookeeper.server.quorum.QuorumPeerMain %*
EndLocal

4.1.3:启动zookeeper服务

在根目录下执行命令

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

出现到下面内容则为启动成功:

4.2:broker的sasl配置

4.2.1修改文件:server.properties 主要修改内容为:

listeners=SASL_PLAINTEXT://192.168.40.150:9091
#使用的认证协议
security.inter.broker.protocol=SASL_PLAINTEXT
#SASL机制 
sasl.enabled.mechanisms=PLAIN  
sasl.mechanism.inter.broker.protocol=PLAIN   
# 完成身份验证的类 
#authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer 
# 如果没有找到ACL(访问控制列表)配置,则允许任何操作。 
allow.everyone.if.no.acl.found=false
#超级管理员权限用户
super.users=User:admin

advertised.listeners=SASL_PLAINTEXT://192.168.40.150:9091

修改后,完整的server.properties文件为:

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
port=9091
host.name=192.168.40.150
advertised.port=9091
advertised.host.name=192.168.40.150

############################# Socket Server Settings #############################

listeners=SASL_PLAINTEXT://192.168.40.150:9091

advertised.listeners=SASL_PLAINTEXT://192.168.40.150:9091

#使用的认证协议
security.inter.broker.protocol=SASL_PLAINTEXT
#SASL机制 
sasl.enabled.mechanisms=PLAIN  
sasl.mechanism.inter.broker.protocol=PLAIN   
# 完成身份验证的类 
#authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer 
# 如果没有找到ACL(访问控制列表)配置,则允许任何操作。 
allow.everyone.if.no.acl.found=false
super.users=User:admin

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

############################# Log Basics #############################

log.dirs=./tmp/kafka-logs

num.partitions=1

num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################

offsets.topic.replication.factor=1

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

#log.flush.interval.messages=10000

#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

log.retention.hours=168

#log.retention.bytes=1073741824

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

############################# Zookeeper #############################

zookeeper.connect=192.168.40.150:2181

zookeeper.connection.timeout.ms=6000

############################# Group Coordinator Settings #############################

group.initial.rebalance.delay.ms=0

4.2.2:修改kafka-server-start.bat 在bat文件中加入:

set KAFKA_OPTS=-Djava.security.auth.login.config=E:/demo/kafka/SASL_PLAINTEXT/broker/2181/kafka_2.12-1.1.1/config/kafka_server_jaas.conf

修改完后的kafka-server-start.bat完整信息为:

@echo off

IF [%1] EQU [] (
	echo USAGE: %0 server.properties
	EXIT /B 1
)

SetLocal
set KAFKA_OPTS=-Djava.security.auth.login.config=E:/demo/kafka/SASL_PLAINTEXT/broker/2181/kafka_2.12-1.1.1/config/kafka_server_jaas.conf
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
    set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%~dp0../../config/log4j.properties
)
IF ["%KAFKA_HEAP_OPTS%"] EQU [""] (
    rem detect OS architecture
    wmic os get osarchitecture | find /i "32-bit" >nul 2>&1
    IF NOT ERRORLEVEL 1 (
        rem 32-bit OS
        set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
    ) ELSE (
        rem 64-bit OS
        set KAFKA_HEAP_OPTS=-Xmx1G -Xms1G
    )
)
"%~dp0kafka-run-class.bat" kafka.Kafka %*
EndLocal

4.2.3 启动 broker

.\bin\windows\kafka-server-start.bat .\config\server.properties

出现这样的信息则为成功:

4.3:producer的sasl配置

4.3.1:修改producer.properties文件,在文件末尾新增一下内容:

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

4.3.2:启动kafka-console-producer.bat

.\bin\windows\kafka-console-producer.bat --broker-list 192.168.40.150:9091 --topic testTopic  --producer.config  .\config\producer.properties

这里使用的已经创建好的topic(topicTest)。和以往的启用,多了--producer.config。下面这样则为成功

4.4 consumer的sasl配置

4.4.1:修改consumer.properties文件,在文件的末尾新增以下内容:

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

4.4.2:启动kafka-console-consumer.bat

.\bin\windows\kafka-console-consumer.bat --bootstrap-server  192.168.40.150:9091 --topic testTopic --consumer.config .\config\consumer.properties

读取testTopic内容的信息,和以往的启用相比,多了--consumer.config。出现下面界面则为成功,打印刚刚producer新增的数据

注意:这里因为在producer.properties,consumer.properties文件中配置了sasl.jaas.config这个其实就是配置认证信息,除了这样配置以外,还可以像broker一样,在kafka-console-producer.bat, kafka-console-consumer.bat文件中加入:

set KAFKA_OPTS= -Djava.security.auth.login.config=file:%~dp0../../config/kafka_server_jaas.conf
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇004.Kafka消息存储和处理 下一篇sparkStreaming连接kafka(Direct..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目