E-MapReduce(简称EMR)从EMR-3.16.0版本开始支持Apache Flume。本文介绍如何通过命令方式,使用Flume同步EMR Kafka集群的数据至EMR Hadoop集群的Hive。
前提条件
- 已创建Hadoop集群,并且选择了Flume服务,详情请参见创建集群。
- 已创建Kafka集群,详情请参见创建集群。
说明
- 如果创建的是Hadoop高安全集群,消费标准Kafka集群的数据,需在Hadoop集群配置Kerberos认证,详情请参见兼容MIT Kerberos认证。
- 如果创建的是Kafka高安全集群,通过Flume将数据写入标准Hadoop集群,请参见 Kerberos Kafka Source。
- 如果创建的Hadoop集群和Kafka集群都是高安全集群,需配置跨域互信,详情请参见跨域互信,其它配置请参见跨域互信使用Flume。
同步Kafka数据至Hive
消费Kerberos Kafka source
消费高安全Kafka集群的数据时,需要完成额外的配置:
- 在Kafka集群配置Kerberos认证,将生成的test.keytab文件拷贝至Hadoop集群的/etc/ecm/flume-conf路径下,详情请参见兼容MIT Kerberos认证;将Kafka集群的/etc/ecm/has-conf/krb5.conf文件拷贝至Hadoop集群的/etc/ecm/flume-conf路径下。
- 配置flume.properties。
在
flume.properties中添加如下配置。a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
- 配置Kafka client。
- 在/etc/ecm/flume-conf下创建文件flume_jaas.conf。
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/ecm/flume-conf/test.keytab" serviceName="kafka" principal="test@EMR.${realm}.COM"; };
${realm} 需要替换为Kafka集群的Kerberos realm。
${realm}获取方式:在Kafka集群执行命令
hostname
,得到形式为emr-header-1.cluster-xxx
的主机名,例如emr-header-1.cluster-123456
,其中数字串123456即为realm。 - 修改/etc/ecm/flume-conf/flume-env.sh。
初始情况下,
/etc/ecm/flume-conf/下没有
flume-env.sh 文件,需要拷贝
flume-env.sh.template并重命名为
flume-env.sh。在
flume-env.sh文件末尾添加如下内容。export JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/etc/ecm/flume-conf/krb5.conf" export JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/etc/ecm/flume-conf/flume_jaas.conf"
- 在/etc/ecm/flume-conf下创建文件flume_jaas.conf。
- 设置域名。
将Kafka集群各节点的长域名和IP的绑定信息添加到Hadoop集群的
/etc/hosts文件末尾。长域名的形式为
emr-header-1.cluster-xxxx。
说明 图中标注①表示的是Hadoop集群的域名;图中标注②表示新增加的Kafka集群域名。
跨域互信使用Flume
在配置了跨域互信后,其他配置如下:
- 在Kafka集群配置Kerberos认证,将生成的keytab文件test.keytab拷贝至Hadoop集群的/etc/ecm/flume-conf路径下,详情请参见兼容MIT Kerberos认证。
- 配置flume.properties。
在
flume.properties中添加如下配置。a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
- 配置Kafka client。
- 在/etc/ecm/flume-conf下创建文件flume_jaas.conf,内容如下。
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/ecm/flume-conf/test.keytab" serviceName="kafka" principal="test@EMR.${realm}.COM"; };
${realm}替换为Kafka集群的Kerberos realm。
${realm}获取方式:在Kafka集群执行命令
hostname
,得到形式为emr-header-1.cluster-xxx
的主机名,例如emr-header-1.cluster-123456
,其中数字串123456即为realm。 - 修改/etc/ecm/flume-conf/flume-env.sh。
初始情况下,
/etc/ecm/flume-conf/下没有
flume-env.sh文件,需要拷贝
flume-env.sh.template并重命名为
flume-env.sh。在
flume-env.sh文件末尾添加如下内容。export JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/etc/ecm/flume-conf/flume_jaas.conf"
- 在/etc/ecm/flume-conf下创建文件flume_jaas.conf,内容如下。