设为首页 加入收藏

TOP

kafka SASL_SSL 及 简单的java消费者及生产者
2018-11-29 11:20:25 】 浏览:119
Tags:kafka SASL_SSL 简单 java 消费者 生产者
版权声明:本文为博主原创文章,未经博主允许不得转载。邮箱联系 xiafanli1987@163.com https://blog.csdn.net/xiafanli_python/article/details/78992139

kafka ssl证书开启脚本

更新了ssl证书脚本,添加了python版本的证书导出

#!/bin/bash

#################################
BASE_DIR=/data/kafka_2.11_auth
CERT_OUTPUT_PATH="$BASE_DIR/server_cert"
CLIENT_CERT_OUTPUT_PATH="$BASE_DIR/client_cert"
PASSWORD=graph123
KEY_STORE="$CERT_OUTPUT_PATH/kafka.keystore"
CLIENT_KEY_STORE="$CLIENT_CERT_OUTPUT_PATH/client.kafka.keystore"
TRUST_STORE="$CERT_OUTPUT_PATH/kafka.truststore"
CLIENT_TRUST_STORE="$CLIENT_CERT_OUTPUT_PATH/client.kafka.truststore"
CLUSTER_NAME=graphca
CERT_AUTH_FILE="$CERT_OUTPUT_PATH/ca-cert"
CLUSTER_CERT_FILE="$CERT_OUTPUT_PATH/${CLUSTER_NAME}-cert"
CLIENT_CLUSTER_CERT_FILE="$CERT_OUTPUT_PATH/${CLUSTER_NAME}-cert-client"
DAYS_VALID=999
DNAME="CN=graph,OU=graph, O=graph, L=graph, ST=graph, C=CN"
#################################
mkdir -p $CERT_OUTPUT_PATH
mkdir -p $CLIENT_CERT_OUTPUT_PATH

echo "1:创建秘钥和证书"
keytool -keystore $KEY_STORE -alias $CLUSTER_NAME -validity $DAYS_VALID -genkey -keyalg RSA \
-storepass $PASSWORD -keypass $PASSWORD -dname "$DNAME"

keytool -keystore $CLIENT_KEY_STORE -alias $CLUSTER_NAME -validity $DAYS_VALID -genkey -keyalg RSA \
-storepass $PASSWORD -keypass $PASSWORD -dname "$DNAME"

echo "2:创建CA证书信任库"
openssl req -new -x509 -keyout ${CERT_OUTPUT_PATH}/ca-key -out "$CERT_AUTH_FILE" -days "$DAYS_VALID" \
-passin pass:"$PASSWORD" -passout pass:"$PASSWORD" \
-subj "/C=CN/ST=graph/L=graph/O=graph/CN=CN"
keytool -keystore "$TRUST_STORE" -alias CARoot \
-import -file "$CERT_AUTH_FILE" -storepass "$PASSWORD" -keypass "$PASSWORD" -noprompt
keytool -keystore "$CLIENT_TRUST_STORE" -alias CARoot \
-import -file "$CERT_AUTH_FILE" -storepass "$PASSWORD" -keypass "$PASSWORD" -noprompt
echo "3:CA证书签名"
keytool -keystore "$KEY_STORE" -alias "$CLUSTER_NAME" -certreq -file "$CLUSTER_CERT_FILE" \
-storepass "$PASSWORD" -keypass "$PASSWORD" -noprompt
keytool -keystore "$CLIENT_KEY_STORE" -alias "$CLUSTER_NAME" -certreq -file "$CLIENT_CLUSTER_CERT_FILE" \
-storepass "$PASSWORD" -keypass "$PASSWORD" -noprompt

penssl x509 -req -CA "$CERT_AUTH_FILE" -CAkey $CERT_OUTPUT_PATH/ca-key -in "$CLUSTER_CERT_FILE" \
-out "${CLUSTER_CERT_FILE}-signed" \
-days "$DAYS_VALID" -CAcreateserial -passin pass:"$PASSWORD"
penssl x509 -req -CA "$CERT_AUTH_FILE" -CAkey $CERT_OUTPUT_PATH/ca-key -in "$CLIENT_CLUSTER_CERT_FILE" \
-out "${CLIENT_CLUSTER_CERT_FILE}-signed" \
-days "$DAYS_VALID" -CAcreateserial -passin pass:"$PASSWORD"

echo "4:创建集群证书到keystore"
keytool -keystore "$TRUST_STORE" -alias "${CLUSTER_NAME}" -import -file "${CLUSTER_CERT_FILE}-signed" \
-storepass "$PASSWORD" -keypass "$PASSWORD" -noprompt
keytool -keystore "$CLIENT_TRUST_STORE" -alias "${CLUSTER_NAME}" -import -file "${CLIENT_CLUSTER_CERT_FILE}-signed" \
-storepass "$PASSWORD" -keypass "$PASSWORD" -noprompt

kafka server.properties

broker.id=100
host.name=dggts10012035
advertised.host.name=dggts10012035
delete.topic.enable=true
auto.create.topics.enable=true
num.network.thread=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.revice.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data01/kafka_2.11_auth/data
kafka.logs.dir=logs
num.partitions=3
default.replication.factor=2
num.recovery.threads.per.data.dir=1
offsets.topic.replcation.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741024
log.retention.check.interval.ms=300000
zookeeper.connect=dggts10012041:2181/kafka_ssl_test3
zookeeper.connection.timeout.ms=1000000
group.initial.rebalance.delay.ms=0
#ssl认证部分
listeners=PLAINTEST://:9092,SASL_SSL://:9094
#如果不用ssl 下面需要修改
advertised.listeners=PLAINTEXT://dggta10012035:9092,SASL_SSL://dggta10012035:9094
#advertised.listeners=PLAINTEXT://dggta10012035:9092,SASL_PLAINTEXT://dggta10012035:9094
下一行改为以下6行全部注释掉
ssl.keystore.location=/data01/kafka_2.11_auth/server_cert/kafka.keystore
ssl.keystore.password=graph123
ssl.truststore.location=/data01/kafka_2.11_auth/server_cert/kafka.truststore
ssl.truststore.password=graph123
ssl.key.password=graph123
ssl.client.auth=required

#ACL入口部分
allow.everyone.if.no.acl.found=false
authorizer.class.name=kafka.security.auth.SimleAclAuthorizer
#SASL部分
#如果不使用SSL下面要改
security.inter.broker.protocal=SASL_PLAINTEXT
#security.inter.broker.protocal=SASL_SSL
sasl.machanism.inter.broker.protocol=PLAIN
SASL.enable.machanisms=PLAIN
#super user
super.user=User:admin

python证书导出

keytool -list -rfc -keystore client.kafka.keystore
keytool -export -alias graphca -keystore client.kafka.keystore -rfc -file certificate.pem
keytool -v -importkeystpre client.kafka.keystore -srcalias graphca -destkeystore cert_and_key.p12 -deststoretype PKCS12
openssl pkcs12 -in cert_and_key.p12 -nocerts -nodes
把输出---BEGIN PRIVATE KEY------END PRIVATE KEY---的拷贝到key.pem中

keytool -exportcert -alias graphca -keystore client.kafka.keystore -rfc -file CARoot.pem

kafka_cluster_jaas.conf

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_admin="admin"
    user_reader="reader"
    user_writer="writer";
}

client 端
KafkaCLient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="reader"
    password="reader;
}

#acl 命令
#添加组权限
kafka-acls.sh \
--authorizer kafka.security.auth.SimplAclAuthorizer \
--authorizer-properties zookeeper.connect=dggta10012041:2181/kafka_sl_test3 \
--add --allow-principal User:reader --operation Read --group test-group
#添加用户权限
kafka-acls.sh \
--authorizer kafka.security.auth.SimplAclAuthorizer \
--authorizer-properties zookeeper.connect=dggta10012041:2181/kafka_sl_test3 \
--add --allow-principal User:reader --operation Read --topic test

python-kafka代码
因为SASL_SSL没有测试成功,导出的pkcs12格式的证书失败,python的demo只使用了SASL_PLAINTEXT的权限控制

from kafka import KafkaProducer, KafkaConsumer


print "step1"
consumer = KafkaConsumer("test1",
                         group_id = 'test-group',
                         bootstrap_server='dggts10012036:9094',
                         security_protocol="SASL_PLAINTEXT",
                         sasl_mechanism="PLAIN",
                         sasl_plain_username='admin',
                         sasl_plain_password='admin'
)

print "step2"
producer=KafkaProducer(bootstrap_server='dggts10012036:9094',
                       security_protocol="SASL_PLAINTEXT",
                       sasl_mechanism="PLAIN",
                       sasl_plain_username='admin',
                       sasl_plain_password='admin'
)

print "step3"
producer.send("test1", bytes("Hello World"))
producer.flush()

print "step4"

for msg in consumer:
    print msg.value

java demo
java版本使用的是kafka-client 1.0.0
pom 文件如下:

org.apache.kafka
kafka-clients
1.0.0

producer

package com.study.kafka;

import java.util.Properties;

import com.sun.org.apache.xerces.internal.util.SynchronizedSymbolTable;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;

public class sendMessageKafka {
    public static void main(String[] args) {
        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "dggts10012036:9094");
        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "test");
        System.setProperties("java.security.auth.login.config", "d:\\kafka_2.11_auth\\client_cert\\kafka_client_jaas_admin.conf");
        /*
            auth section
            注销的第一行是使用的SASL_SSL
            剩下的注释是配合SSL使用
         */
//        producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG"SASL_SSL");
        producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG"SASL_PLAINTEXT");
//        producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "path of client.kafka.keystore");
//        producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "graph123");
//        producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "path of client.kafka.keystore");
//        producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "graph123");
//        producerProps.put(SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, "JKS");
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        KafkaProducer producer = new KafkaProducer(producerProps);
        try {
            for (int i = 0; i < 100; i++) {
                producer.send(new ProducerRecord<String, String>("test1", Integer.toString(i), Integer.toString(i)));
                System.out.println("Finish");

            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            producer.close();
        }

    }
} 

consumer

package com.study.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.protocol.types.Field;

import java.util.Arrays;
import java.util.Properties;


public class getMessageKafka {
    public static void main(String[] args) {
        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "dggts10012035:9094");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        System.setProperties("java.security.auth.login.config", "d:\\kafka_2.11_auth\\client_cert\\kafka_client_jaas_admin.conf");
        /*
            auth section
            注销的第一行是使用的SASL_SSL
            剩下的注释是配合SSL使用
         */
//        consumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG"SASL_SSL");
        consumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG"SASL_PLAINTEXT");
//        consumerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "path of client.kafka.keystore");
//        consumerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "graph123");
//        consumerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "path of client.kafka.keystore");
//        consumerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "graph123");
//        consumerProps.put(SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, "JKS");
        consumerProps.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
        consumer.subscribe(Arrays.asList("test1"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record: records) {
                System.out.printf("offset = %d, key= %s , value = %s\n", record.offset(), record.key(), record.value());
            }
        }
    }
}
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Kafka集群搭建(三台) 下一篇玩转Flume+Kafka原来也就那点事儿

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目