设为首页 加入收藏

TOP

DataX二次开发——新增HiveReader插件(二)
2023-07-26 08:16:44 】 浏览:70
Tags:DataX 新增 HiveReader 插件
public List<Configuration> split(int adviceNumber) { return this.commonRdbmsReaderJob.split(originalConfig, adviceNumber); } @Override public void post() { this.commonRdbmsReaderJob.post(originalConfig); } @Override public void destroy() { this.commonRdbmsReaderJob.destroy(originalConfig); } } public static class Task extends Reader.Task { private Configuration readerSliceConfig; private CommonRdbmsReader.Task commonRdbmsReaderTask; @Override public void init() { this.readerSliceConfig = getPluginJobConf(); this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE_TYPE, getTaskGroupId(), getTaskId()); this.commonRdbmsReaderTask.init(this.readerSliceConfig); } @Override public void startRead(RecordSender recordSender) { int fetchSize = this.readerSliceConfig.getInt(FETCH_SIZE, DEFAULT_FETCH_SIZE); this.commonRdbmsReaderTask.startRead(readerSliceConfig, recordSender, getTaskPluginCollector(), fetchSize); } @Override public void post() { this.commonRdbmsReaderTask.post(readerSliceConfig); } @Override public void destroy() { this.commonRdbmsReaderTask.destroy(readerSliceConfig); } } /** 刷新krb内容信息 */ public static void refreshConfig() { try { sun.security.krb5.Config.refresh(); Field defaultRealmField = KerberosName.class.getDeclaredField("defaultRealm"); defaultRealmField.setAccessible(true); defaultRealmField.set( null, org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm()); // reload java.security.auth.login.config javax.security.auth.login.Configuration.setConfiguration(null); } catch (Exception e) { log.warn( "resetting default realm failed, current default realm will still be used.", e); } } }

 2.2 HiveConnByKerberos类

    HiveConnByKerberos类比较简单,是一个通用的Kerberos认证的接口。

package com.alibaba.datax.plugin.reader.hivereader;

import com.alibaba.datax.common.exception.PlumberException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;

@Slf4j
public class HiveConnByKerberos {
    public static void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf,String krb5conf) {
        System.setProperty("java.security.krb5.conf",krb5conf);
        if (StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) {
            UserGroupInformation.setConfiguration(hadoopConf);
            try {
                UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
            }
            catch (Exception e) {

                log.error("kerberos认证失败");
                String message = String.format("kerberos认证失败,请检查 " +
                                "kerberosKeytabFilePath[%s] 和 kerberosPrincipal[%s]",
                        kerberosKeytabFilePath, kerberosPrincipal);
                e.printStackTrace();
                throw DataXException.asDataXException(HiveReaderErrorCode.KERBEROS_LOGIN_ERROR, message, e);
            }
        }
    }
}

 

2.3 HiveReaderErrorCode类

    HiveReaderErrorCode类,主要就是集成ErrorCode类,并添加一个枚举项,这块可直接在ErrorCode类添加,也可使用此类,为固定写法。

package com.alibaba.datax.plugin.reader.hivereader;

import com.alibaba.datax.common.spi.ErrorCode;

public enum HiveReaderErrorCode
        implements ErrorC
首页 上一页 1 2 3 下一页 尾页 2/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Java常用类之String源码分析 下一篇我是如何用CAP和BASE两个基础理论..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目