设为首页 加入收藏

TOP

DataX二次开发——新增HiveReader插件(一)
2023-07-26 08:16:44 】 浏览:66
Tags:DataX 新增 HiveReader 插件

一、研发背景

    DataX官方开源的版本支持HDFS文件的读写,并没有支持基于JDBC的Hive数据读写,很多时候一些数据同步不太方便,比如在读取Hive之前先执行一些sql、读取一些Hive的视图数据、或者在数据同步时执行一段固定的SQL,将SQL执行结果写入下游等各种场景,实际上还是需要Hive插件来支持。而在实际工作中,我们也遇到了类似的一些情况需要二次开发DataX以支持此类场景。本插件已在生产环境稳定运行一年有余,现分享给大家,如有问题也可联系我。

二、HiveReader插件介绍

    hivereader插件比较简单,共有三个类,两个配置文件。其中:

  • HiveReader:实现DataX框架核心方法,是具体逻辑。
  • HiveReaderErrorCode:继承了DataX框架的ErrorCode类,是用于统一异常处理DataXException类中调用,具体是新增了一个枚举值。
  • HiveConnByKerberos:是在检测到Hive具备Kerberos认证要求时,进行认证的工具类。
  • plugin.json:DataX插件固定的配置文件,用于指定插件的入口类。
  • plugin_job_template.json:二次开发插件,一般需要提供一下具体的使用方式,此json文件即为HiveReader插件的配置方式说明。

 

   2.1 HiveReader类

    首先是HiveReader类,需要注意的是一些常量或枚举值,需要自行添加,其中DataBaseType枚举类中,需要新增Hive枚举项并添加Hive的驱动类全路径,具体见注释,另外就是Kerberos认证相关的几个配置,一个是keytab的路径,一个是krb5.conf的路径,另外一个是principle的值。

package com.alibaba.datax.plugin.reader.hivereader; import com.alibaba.datax.common.base.Key; import com.alibaba.datax.common.plugin.RecordSender; import com.alibaba.datax.common.spi.Reader; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.rdbms.reader.CommonRdbmsReader; import com.alibaba.datax.rdbms.util.DataBaseType; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.security.authentication.util.KerberosName; import java.lang.reflect.Field; import java.util.List; import static com.alibaba.datax.common.base.Constant.DEFAULT_FETCH_SIZE;//2048,可根据条件自己取值
import static com.alibaba.datax.common.base.Key.FETCH_SIZE; // 参数名:"fetchSize"
 @Slf4j public class HiveReader extends Reader { //此处需现在com.sinosig.plumber.rdbms.util.DataBaseType枚举类中添加Hive类型,内容为:Hive("hive2", "org.apache.hive.jdbc.HiveDriver"),
    private static final DataBaseType DATABASE_TYPE = DataBaseType.Hive; public static class Job extends Reader.Job { private Configuration originalConfig = null; private CommonRdbmsReader.Job commonRdbmsReaderJob; @Override public void init() { this.originalConfig = getPluginJobConf(); Boolean haveKerberos = this.originalConfig.getBool(Key.HAVE_KERBEROS, false); if (haveKerberos) { log.info("检测到kerberos认证,正在进行认证"); org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration(); String kerberosKeytabFilePath =  this.originalConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH); String kerberosPrincipal =  this.originalConfig.getString(Key.KERBEROS_PRINCIPAL); String krb5Path =  this.originalConfig.getString(Key.KRB5_CONF_FILE_PATH); hadoopConf.set("hadoop.security.authentication", "kerberos"); hadoopConf.set("hive.security.authentication", "kerberos"); hadoopConf.set("hadoop.security.authorization", "true"); System.setProperty("java.security.krb5.conf",krb5Path); refreshConfig(); HiveConnByKerberos.kerberosAuthentication(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf,krb5Path); } this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DATABASE_TYPE); this.originalConfig = commonRdbmsReaderJob.init(originalConfig); } @Override public void preCheck() { this.commonRdbmsReaderJob.preCheck(originalConfig, DATABASE_TYPE); } @Override
首页 上一页 1 2 3 下一页 尾页 1/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Java常用类之String源码分析 下一篇我是如何用CAP和BASE两个基础理论..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目