Kafka Source is an Apache Kafka consumer that reads messages from Kafka topics. If you have multiple Kafka sources running, you can configure them with the same Consumer Group so each will read a unique set of partitions for the topics.
kafkaSource是一个从kafka的topic中读取消息的Apache kafka的消费者;如果运行着许多/多样的kafka Source,可以使用相同的消费者组来配置它们,来保证它们读到topics中唯一分区集中的数据。
Unique identified of consumer group. Setting the same id in multiple sources or agents indicates that they are part of the same consumer group(唯一标识消费者组)
kafka.topics
-
Comma-separated list of topics the kafka consumer will read messages from.(kafka consumer将要读取的,以逗号分隔的topics的列表)
kafka.topics.regex
-
Regex that defines set of topics the source is subscribed on. This property has higher priority than kafka.topics and overrides kafka.topics if exists.(被source订阅的,用来定义topics集的正则表达式)
batchSize(批大小)
1000
Maximum number of messages written to Channel in one batch(一批数据包含消息的最大条数)
batchDurationMillis(batch持续时间)
1000
Maximum time (in ms) before a batch will be written to Channel The batch will be written whenever the first of size and time will be reached.(批写入channel之前的最大时间)
backoffSleepIncrement
1000
Initial and incremental wait time that is triggered when a Kafka Topic appears to be empty. Wait period will reduce aggressive pinging of an empty Kafka Topic. One second is ideal for ingestion use cases but a lower value may be required for low latency operations with interceptors.
maxBackoffSleep
5000
Maximum wait time that is triggered when a Kafka Topic appears to be empty. Five seconds is ideal for ingestion use cases but a lower value may be required for low latency operations with interceptors.
useFlumeEventFormat
false
By default events are taken as bytes from the Kafka topic directly into the event body. Set to true to read events as the Flume Avro binary format. Used in conjunction with the same property on the KafkaSink or with the parseAsFlumeEvent property on the Kafka Channel this will preserve any Flume headers sent on the producing side.
setTopicHeader
true
When set to true, stores the topic of the retrieved message into a header, defined by the topicHeader property.
topicHeader
topic
Defines the name of the header in which to store the name of the topic the message was received from, if the setTopicHeader property is set to true. Care should be taken if combining with the Kafka Sink topicHeader property so as to avoid sending the message back to the same topic in a loop.
migrateZookeeperOffsets
true
When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka. This should be true to support seamless Kafka client migration from older versions of Flume. Once migrated this can be set to false, though that should generally not be required. If no Zookeeper offset is found, the Kafka configuration kafka.consumer.auto.offset.reset defines how offsets are handled. Check Kafka documentation for details
kafka.consumer.security.protocol
PLAINTEXT
Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup.
more consumer security props
If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties that need to be set on consumer.
Other Kafka Consumer Properties
–
These properties are used to configure the Kafka Consumer. Any consumer property supported by Kafka can be used. The only requirement is to prepend the property name with the prefix kafka.consumer. For example: kafka.consumer.auto.offset.reset
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used
三、安全性和kafkaSource
Security and Kafka Source
Secure authentication as well as data encryption is supported on the communication channel between Flume and Kafka. For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the parameter is named SSL, the actual protocol is a TLS implementation) can be used from Kafka version 0.9.0. 在flume以及kafka之间的通信信道中支持安全验证以及数据加密。对于安全验证SASL/GSSAPI或者SSL在kafka0.9.0版本之后可以使用。
As of now data encryption is solely provided by SSL/TLS. 目前数据安全仅支持SSL/TLS
Setting kafka.consumer.security.protocol to any of the following value means:
SASL_PLAINTEXT - Kerberos or plaintext authentication with no data encryption
SASL_SSL - Kerberos or plaintext authentication with data encryption
SSL - TLS based encryption with optional authentication.
四、TLS and Kafka Source:
Please read the steps described in Configuring Kafka Clients SSL to learn about additional configuration settings for fine tuning for example any of the following: security provider, cipher suites, enabled protocols, truststore or keystore types.
服务端验证和数据加密的配置实例
Example configuration with server side authentication and data encryption
Note: By default the property ssl.endpoint.identification.algorithm is not defined, so hostname verification is not performed. In order to enable hostname verification, set the following properties
注意:默认ssl.endpoint.identification.algorithm没有定义,所以主机名验证没有执行。为了开启主机验证需要进行如下配置
If client side authentication is also required then additionally the following should be added to Flume agent configuration. Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers either individually or by their signature chain. Common example is to sign each client certificate by a single Root CA which in turn is trusted by Kafka brokers.
如果客户端仍然需要验证,需要在flume的agent配置中增加如下配置。每个flume agent必须有它客户端的客户端证书,此客户端证书必须要么被kafka的brokers信任,要么被它们的签名链信任。
a1.sources.source1.kafka.consumer.ssl.keystore.location=/path/to/client.keystore.jks
a1.sources.source1.kafka.consumer.ssl.keystore.password=<password to access the keystore>
If keystore and key use different password protection then ssl.key.password property will provide the required additional secret for both consumer keystores:
a1.sources.source1.kafka.consumer.ssl.key.password=<password to access the key>
五、Kerberos(麻省理工学院安全认证系统)和kafka Source
Kerberos and Kafka Source:
To use Kafka source with a Kafka cluster secured with Kerberos, set the consumer.security.protocol properties noted above for consumer. The Kerberos keytab and principal to be used with Kafka brokers is specified in a JAAS file’s “KafkaClient” section. “Client” section describes the Zookeeper connection if needed. See Kafka doc for information on the JAAS file contents. The location of this JAAS file and optionally the system wide kerberos configuration can be specified via JAVA_OPTS in flume-env.sh:
Sample JAAS file. For reference of its content please see client config sections of the desired authentication mechanism (GSSAPI/PLAIN) in Kafka documentation of SASL configuration. Since the Kafka Source may also connect to Zookeeper for offset migration, the “Client” section was also added to this example. This won’t be needed unless you require offset migration, or you require this section for other secure components. Also please make sure that the operating system user of the Flume processes has read privileges on the jaas and keytab files.