设为首页 加入收藏

TOP

Hive中的自定义分隔符(包含Hadoop和Hive详细安装)
2019-04-05 01:03:37 】 浏览:46
Tags:Hive 定义 分隔 包含 Hadoop 详细 安装
导出到 HDFS 或者本地的数据文件,需要直接导入 Hive 时,有时包含特殊字符,按照给定的字段分隔符或者默认换行分隔符,插入到 Hive 的数据可能不是我们预期的,此时需要我们自定义 Hive 的分隔符。
基本原理是,Hive 将 HDFS 上的文件(或者本地文件)导入 Hive 时会默认调用 Hadoop(hadoop-mapreduce-client-core-2.7.7.jar包)的 TextInputFormat 将输入的数据进行格式化。因此我们只需自定义重写的 TextInputFormat 类,重写 TextInputFormat 类时需要在 getRecordReader() 方法里重写 LineRecordReader 类,然后在 Hive CLI 设置好编码、字段分隔符、行分隔符,最后在建表的时候指定 INPUTFORMAT 为我们自定义的 TextInputFormat 类。

一、环境准备

环境使用 Centos 7,同时先安装好 JDK。Hive 需要依赖于 Hadoop,因此需要先安装好 Hadoop 和 Hive 。本次 Hadoop 选用的版本是 2.7.7,Hive的版本是 1.2.2

1.1 Hadoop 安装

1. 下载
wget http://archive.apache.org/dist/hadoop/common/hadoop-2.7.7/hadoop-2.7.7.tar.gz
2. 解压
tar -zxvf hadoop-2.7.7.tar.gz -C /opt/
3. 修改 hadoop-env.sh 配置文件
进入到刚解压的Hadoop根目录下, vim etc/hadoop/hadoop-env.sh ,大概在27行,编辑 JAVA_HOME
export JAVA_HOME=/usr/local/jdk1.8.0_151
4. 修改 core-site.xml 配置文件
进入到刚解压的Hadoop根目录下,vim etc/hadoop/core-site.xml。 其中node1是配置的节点映射名。一下配置的文件目录路径可以自行先创建好,例如在 /opt/data/hadoop/ 下创建 hdfs、tmp、yarn 等。
<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://node1:8020</value>
    </property>
    <property>
      <name>hadoop.tmp.dir</name>
      <value>/opt/data/hadoop/tmp</value>
   </property>
    <property>
      <name>fs.checkpoint.period</name>
      <value>3600</value>
   </property>
   <!-- 使用beenline需要配置这项 -->
   <!-- 使用sqoop需要配置这项 -->
   <property>
      <name>hadoop.proxyuser.root.groups</name>
      <value>*</value>
   </property>
   <property>
      <name>hadoop.proxyuser.root.hosts</name>
      <value>*</value>
   </property>
</configuration>
4. 修改 hdfs-site.xml 配置文件
进入到刚解压的Hadoop根目录下,vim etc/hadoop/hdfs-site.xml
<configuration>
   <property>
      <name>dfs.namenode.name.dir</name>
      <value>/opt/data/hadoop/hdfs/name</value>
   </property>
   <property>
      <name>dfs.datanode.data.dir</name>
      <value>/opt/data/hadoop/hdfs/data</value>
   </property>
   <property>
      <name>dfs.namenode.checkpoint.dir</name>
      <value>/opt/data/hadoop/hdfs/namesecondary</value>
   </property>
   <property>
      <name>dfs.replication</name>
      <value>1</value>
   </property>
   <property>
      <name>dfs.namenode.http-address</name>
      <value>node1:50070</value>
   </property>
   <property>
      <name>dfs.namenode.secondary.http-address</name>
      <value>node1:50090</value>
   </property>
   <property>
      <name>dfs.webhdfs.enabled</name>
      <value>true</value>
   </property>
   <property>
      <name>dfs.permissions</name>
      <value>false</value>
   </property>
</configuration>
5. 修改 mapred-site.xml 配置文件
进入到刚解压的Hadoop根目录下,先复制一份:cp etc/hadoop/mapred-site.xml.template etc/hadoop/mapred-site.xml,
然后再编辑 vim etc/hadoop/mapred-site.xml
<configuration>
   <property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
   <!-- jobhistory properties -->
   <property>
      <name>mapreduce.jobhistory.address</name>
      <value>node1:10020</value>
   </property>
   <property>
      <name>mapreduce.jobhistory.webapp.address</name>
      <value>node1:19888</value>
   </property>
</configuration>
6. 修改 yarn-site.xml 配置文件
进入到刚解压的Hadoop根目录下,vim etc/hadoop/yarn-site.xml
<configuration>

<!-- Site specific YARN configuration properties -->

   <property>
<name>yarn.resourcemanager.hostname</name>
<value>node1</value>
</property>
   <property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
   <property>
         <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
         <value>org.apache.hadoop.mapred.ShuffleHandler</value>
    </property>

   <property>
      <name>yarn.nodemanager.local-dirs</name>
      <value>/opt/data/hadoop/yarn/local</value>
   </property>
   <property>
      <name>yarn.nodemanager.remote-app-log-dir</name>
      <value>/opt/data/hadoop/yarn/logs</value>
   </property>

   <property>
      <name>yarn.resourcemanager.address</name>
      <value>${yarn.resourcemanager.hostname}:8032</value>
   </property>
   <property>
      <name>yarn.resourcemanager.scheduler.address</name>
      <value>${yarn.resourcemanager.hostname}:8030</value>
   </property>
   <property>
      <name>yarn.resourcemanager.webapp.address</name>
      <value>${yarn.resourcemanager.hostname}:8088</value>
   </property>
   <property>
      <name>yarn.resourcemanager.webapp.https.address</name>
      <value>${yarn.resourcemanager.hostname}:8090</value>
   </property>
   <property>
      <name>yarn.resourcemanager.resource-tracker.address</name>
      <value>${yarn.resourcemanager.hostname}:8031</value>
   </property>
   <property>
      <name>yarn.resourcemanager.admin.address</name>
      <value>${yarn.resourcemanager.hostname}:8033</value>
   </property>

   <property>
      <name>yarn.log-aggregation-enable</name>
      <value>true</value>
   </property>
   <property>
      <name>yarn.log.server.url</name>
      <value>http://node1:19888/jobhistory/logs/</value>
   </property>
   <property>
      <name>yarn.nodemanager.vmem-check-enabled</name>
      <value>false</value>
   </property>
</configuration>
7. 修改 slaves 配置文件
进入到刚解压的Hadoop根目录下,vim etc/hadoop/slaves,文件中将从节点加进去
node1
8. 配置Hadoop环境变量 
编辑用户的 profile 文件,vim /etc/profile
#hadoop配置
export HADOOP_HOME=/opt/hadoop-2.7.7
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_USER_CLASSPATH_FIRST=true
立即生效 source /etc/profile

9. 监测环境变量是否生效
任意路径下输入 hadoop version ,可以看到 Hadoop 的安装版本信息标识环境变量已生效。

10. 格式化 NameNode
bin/hdfs namenode -format
11. 启动 Hadoop
 sbin/start-all.sh
也可以用 sbin下的其他脚本分别启动 hdfs 和 yarn

12. 检查
查看启动的线程 jps

Hadoop jps

13. 页面访问UI
在浏览器中输入 http://node1:50070 可以看到Hadoop的详细信息


14. 运行自带的 WordCount 测试环境是否可以正常运行和计算
hadoop fs -mkdir /input
hadoop fs -put README.txt /input
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.7.jar wordcount  /input   /output
有如下语句表示可以正常运行

test WordCount


1.2 Hive 安装

1. 下载
wget http://archive.apache.org/dist/hive/hive-1.2.2/apache-hive-1.2.2-bin.tar.gz
2. 解压
tar -zxvf apache-hive-1.2.2-bin.tar.gz -C /opt/
3. 在HDFS上创建所需的目录
hdfs dfs -mkdir -p /hive/warehouse
hdfs dfs -mkdir -p /hive/tmp
hdfs dfs -mkdir -p /hive/log
hdfs dfs -chmod -R 755 /hive/warehouse
hdfs dfs -chmod -R 755 /hive/tmp
hdfs dfs -chmod -R 755 /hive/log
4. Mysql需要设置一个可以远程访问的账号,然后再创建一个hive数据库
mysql> use mysql; 
mysql> select host,user from user; 
mysql> grant all privileges on *.* to 'hive'@'%' identified by '远程访问mysql的密码' with grant option; 
mysql> flush privileges;
mysql> create database hive;
mysql> exit;
5. 配置hive的环境变量
编辑 profile 文件
export HIVE_HOME=/opt/apache-hive-1.2.2-bin
export PATH=$PATH:$HIVE_HOME/bin
6. 复制重命名 hive 的配置文件
cd $HIVE_HOME/conf
cp hive-env.sh.template hive-env.sh
cp hive-default.xml.template hive-site.xml
cp hive-log4j.properties.template hive-log4j.properties
cp hive-exec-log4j.properties.template hive-exec-log4j.properties
cp beeline-log4j.properties.template beeline-log4j.properties
7. 修改hive-env.sh中的内容:
export JAVA_HOME=/usr/local/jdk1.8.0_131
export HADOOP_HOME=/opt/hadoop-2.7.7
export HIVE_HOME=/opt/apache-hive-1.2.2-bin
export HIVE_CONF_DIR=/opt/apache-hive-1.2.2-bin/conf
8. 配置hive-log4j.properties
找到 log4j.appender.EventCounter=org.apache.hadoop.hive.shims.HiveEventCounter 注释掉,改为:
    #log4j.appender.EventCounter=org.apache.hadoop.hive.shims.HiveEventCounter
    log4j.appender.EventCounter=org.apache.hadoop.log.metrics.EventCounter
9. 修改hive-site.xml
在 hive-site.xml 中找到对应的配置项,修改如下配置(Hive 元数据保存到 Mysql),其他配置项默认。
<!--Hive工作写,HDFS根目录位置-->
   <property>
      <name>hive.exec.scratchdir</name>
      <value>/hive/tmp</value>
      <description>HDFS root scratch dir for Hive jobs which gets created with write all (733) permission. For each connecting user, an HDFS scratch dir: ${hive.exec.scratchdir}/&lt;username&gt; is created, with ${hive.scratch.dir.permission}.</description>
   </property>
   <!-- 数据存储的HDFS目录,用来存储Hive数据库、表等数据 -->
   <property>
      <name>hive.metastore.warehouse.dir</name>
      <value>/hive/warehouse</value>
      <description>location of default database for the warehouse</description>
   </property>
   <!-- 远程服务HiveServer2绑定的IP -->
   <property>
      <name>hive.server2.thrift.bind.host</name>
      <value>node1</value>
      <description>Bind host on which to run the HiveServer2 Thrift service.</description>
   </property>
   <!-- 配置mysql数据库连接,用来存储数据库元信息 -->
   <property>
      <name>javax.jdo.option.ConnectionURL</name>
      <!--<value>jdbc:derby:;databaseName=metastore_db;create=true</value>-->
      <value>jdbc:mysql://node1:3306/hivecreateDatabaseIfNotExist=true&amp;useUnicode=true&amp;characterEncoding=utf8&amp;useSSL=false</value>
       <description>JDBC connect string for a JDBC metastore</description>
   </property>
   <!-- 配置mysql数据库驱动名称 -->
   <property>
      <name>javax.jdo.option.ConnectionDriverName</name>
      <!--<value>org.apache.derby.jdbc.EmbeddedDriver</value>-->
      <value>com.mysql.jdbc.Driver</value>
       <description>Driver class name for a JDBC metastore</description>
   </property>
   <!-- Mysql数据库用户名 -->
   <property>
      <name>javax.jdo.option.ConnectionUserName</name>
      <!--<value>APP</value>-->
      <value>root</value>
       <description>Username to use against metastore database</description>
   </property>
   <!-- Mysql数据库登陆密码 -->
   <property>
      <name>javax.jdo.option.ConnectionPassword</name>
      <!--<value>mine</value>-->
      <value>123456</value>
       <description>password to use against metastore database</description>
   </property>
   <!-- 配置Hive Web方式时,对应的war包内容包含Hive源码中Web相关内容 -->
   <!--<property>
      <name>hive.hwi.war.file</name>
      <value>lib/hive-hwi-1.2.2.war</value>
   </property>
   -->
10. 替换文件中的一些路径
在本地创建如下文件夹  
/opt/data/hive/tmp
/opt/data/hive/log
然后将 hive-site.xml 中的 ${system:java.io.tmpdir} 改成 /opt/data/hive/tmp

11. 添加MySQL驱动包
下载驱动包,然后放在 $HIVE_HOME 目录下的 lib
wget http://central.maven.org/maven2/mysql/mysql-connector-java/5.1.47/mysql-connector-java-5.1.47.jar
12. 更新 Hadoop 的 jar 包$HADOOP_HOME/share/hadoop/yarn/lib/ 下的 jline*.jar 文件,替换为 $HIVE_HOME/lib/jline-2.12.jar

13. 初始化 hive
到 bin 目录下执行 schematool -dbType mysql -initSchema 。这一步会在 Mysql 的 hive 库下初始化的表。

14. 启动 hive
启动 metastore(这是jps查看进程会有一个RunJar)  hive --service metastore & 。这一步会启动一个 RunJar 进程。

15 使用和退出
hive
hive> show databases;
OK
default
Time taken: 0.786 seconds, Fetched: 1 row(s)
hive> quit;

二、Hive 自定义分隔符

自定义部分使用 Java 编写,使用Idea新建一个 Maven项目。

2.1 Maven项目的 pom.xml文件中添加如下依赖

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.7</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>1.2.2</version>
            <!--<exclusions>
                <exclusion>
                    <groupId>org.pentaho</groupId>
                    <artifactId>pentaho-aggdesigner-algorithm</artifactId>
                </exclusion>
            </exclusions>-->
        </dependency>

    </dependencies>

2.2自定义重写的 TextInputFormat 类

package yore.hive;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapred.*;

import java.io.IOException;

/**
 * 自定义重写的 TextInputFormat 类
 * 此类是将org.apache.hadoop.mapred下的TextInputFormat源码拷贝进来进行改写。
 *
 * <pre>
 *  Hive将HDFS上的文件导入Hive会进行如下处理:
 *      调用InputFormat,将文件切成不同的文档。每篇文档即一行(Row)。
 *      调用SerDe的Deserializer,将一行(Row),切分为各个字段。
 *
 *  可以查看hadoop-mapreduce-client-core-2.7.7.jar包org.apache.hadoop.mapred下的类TextInputFormat。
 *  建表前在hive的CLI界面上输入如下即可实现自定义多字符换行符
 *      set textinputformat.record.delimiter=<自定义换行字符串>;
 * Maven项目的 pom.xml文件中添加如下依赖。
 *
 * </pre>
 *
 * Created by yore on 2019/4/3 17:56
 */
public class SQPTextInputFormat extends FileInputFormat<LongWritable, Text> implements JobConfigurable {

    private CompressionCodecFactory compressionCodecs = null;
    //"US-ASCII""ISO-8859-1""UTF-8""UTF-16BE""UTF-16LE""UTF-16"
    private final static String defaultEncoding = "UTF-8";
    private String encoding = null;

    public void configure(JobConf jobConf) {
        this.compressionCodecs = new CompressionCodecFactory(jobConf);
    }

    @Override
    protected boolean isSplitable(FileSystem fs, Path filename) {
        CompressionCodec codec = this.compressionCodecs.getCodec(filename);
        if (null == codec) {
            return true;
        }
        return codec instanceof SplittableCompressionCodec;
    }

    public RecordReader<LongWritable, Text> getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
        reporter.setStatus(inputSplit.toString());
        String delimiter = jobConf.get("textinputformat.record.linesep");
        this.encoding = jobConf.get("textinputformat.record.encoding",defaultEncoding);
        byte[] recordDelimiterBytes = null;
        if (null != delimiter) {//Charsets.UTF_8
            recordDelimiterBytes = delimiter.getBytes(this.encoding);
        }
        return new SQPRecordReader(jobConf, (FileSplit)inputSplit, recordDelimiterBytes);
    }

}

2.3 自定义的 LineRecordReader 类

package yore.hive;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.util.LineReader;

import java.io.IOException;
import java.io.InputStream;

/**
 * 自定义的 LineRecordReader 类
 * 此类是将org.apache.hadoop.mapred下的 LineRecordReader 源码拷贝进来进行改写。
 *
 * Created by yore on 2019/4/3 18:03
 */
public class SQPRecordReader  implements RecordReader<LongWritable, Text>  {

    private static final Log LOG = LogFactory.getLog(SQPRecordReader.class.getName());

    private CompressionCodecFactory compressionCodecs = null;
    private long start;
    private long pos;
    private long end;
    private LineReader in;
    private FSDataInputStream fileIn;
    private final Seekable filePosition;
    int maxLineLength;
    private CompressionCodec codec;
    private Decompressor decompressor;
    //field separator
    private String FieldSep;
    private static final String defaultFSep="\001";
    //"US-ASCII""ISO-8859-1""UTF-8""UTF-16BE""UTF-16LE""UTF-16"
    private final static String defaultEncoding = "UTF-8";
    private String encoding = null;

    public SQPRecordReader(Configuration job, FileSplit split) throws IOException {
        this(job, split, null);
    }

    public SQPRecordReader(Configuration job, FileSplit split, byte[] recordDelimiter) throws IOException {
        this.maxLineLength = job.getInt("mapreduce.input.linerecordreader.line.maxlength", 2147483647);
        this.FieldSep = job.get("textinputformat.record.fieldsep",defaultFSep);
        this.encoding = job.get("textinputformat.record.encoding",defaultEncoding);
        this.start = split.getStart();
        this.end = (this.start + split.getLength());
        Path file = split.getPath();
        this.compressionCodecs = new CompressionCodecFactory(job);
        this.codec = this.compressionCodecs.getCodec(file);

        FileSystem fs = file.getFileSystem(job);
        this.fileIn = fs.open(file);
        if (isCompressedInput()) {
            this.decompressor = CodecPool.getDecompressor(this.codec);
            if ((this.codec instanceof SplittableCompressionCodec)) {
                SplitCompressionInputStream cIn = ((SplittableCompressionCodec)this.codec).createInputStream(this.fileIn, this.decompressor, this.start, this.end, SplittableCompressionCodec.READ_MODE.BYBLOCK);

                this.in = new LineReader(cIn, job, recordDelimiter);
                this.start = cIn.getAdjustedStart();
                this.end = cIn.getAdjustedEnd();
                this.filePosition = cIn;
            } else {
                this.in = new LineReader(this.codec.createInputStream(this.fileIn, this.decompressor), job, recordDelimiter);
                this.filePosition = this.fileIn;
            }
        } else {
            this.fileIn.seek(this.start);
            this.in = new LineReader(this.fileIn, job, recordDelimiter);
            this.filePosition = this.fileIn;
        }

        if (this.start != 0L) {
            this.start += this.in.readLine(new Text(), 0, maxBytesToConsume(this.start));
        }
        this.pos = this.start;
    }

    public SQPRecordReader(InputStream in, long offset, long endOffset, int maxLineLength) {
        this(in, offset, endOffset, maxLineLength, null);
    }

    public SQPRecordReader(InputStream in, long offset, long endOffset, int maxLineLength, byte[] recordDelimiter) {
        this.maxLineLength = maxLineLength;
        this.in = new LineReader(in, recordDelimiter);
        this.start = offset;
        this.pos = offset;
        this.end = endOffset;
        this.filePosition = null;
    }

    public SQPRecordReader(InputStream in, long offset, long endOffset, Configuration job) throws IOException {
        this(in, offset, endOffset, job, null);
    }

    public SQPRecordReader(InputStream in, long offset, long endOffset, Configuration job, byte[] recordDelimiter) throws IOException {
        this.maxLineLength = job.getInt("mapreduce.input.linerecordreader.line.maxlength", 2147483647);
        this.in = new LineReader(in, job, recordDelimiter);
        this.start = offset;
        this.pos = offset;
        this.end = endOffset;
        this.filePosition = null;
    }

    public LongWritable createKey() {
        return new LongWritable();
    }

    public Text createva lue() {
        return new Text();
    }

    private boolean isCompressedInput() {
        return this.codec != null;
    }

    private int maxBytesToConsume(long pos) {
        return isCompressedInput()  2147483647 : (int)Math.min(2147483647L, this.end - pos);
    }

    private long getFilePosition() throws IOException {
        long retVal;
        if ((isCompressedInput()) && (null != this.filePosition))
            retVal = this.filePosition.getPos();
        else {
            retVal = this.pos;
        }
        return retVal;
    }


    public boolean next(LongWritable longWritable, Text text) throws IOException {
        while (getFilePosition() <= this.end) {
            longWritable.set(this.pos);

            int newSize = this.in.readLine(text, this.maxLineLength, Math.max(maxBytesToConsume(this.pos), this.maxLineLength));

            if (newSize == 0) {
                return false;
            }

            if (encoding.compareTo(defaultEncoding) != 0) {
                String str = new String(text.getBytes(), 0, text.getLength(), encoding);
                text.set(str);
            }

            if (FieldSep.compareTo(defaultFSep) != 0) {
                String replacedValue = text.toString().replace(FieldSep, defaultFSep);
                text.set(replacedValue);
            }

            this.pos += newSize;
            if (newSize < this.maxLineLength) {
                return true;
            }

            LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - newSize));
        }

        return false;
    }

    public long getPos() throws IOException {
        return this.pos;
    }

    public void close() throws IOException {
        try {
            if (this.in != null)
                this.in.close();
        }
        finally {
            if (this.decompressor != null)
                CodecPool.returnDecompressor(this.decompressor);
        }
    }

    public float getProgress() throws IOException {
        if(this.start == this.end){
            return 0.0F;
        }
        return Math.min(1.0F, (float)(getFilePosition() - this.start) / (float)(this.end - this.start));
    }

}

三、Useing

启动 Hive CLI 后,先测试和查看当前 Hive 的信息,输入如下命令

hive> create table test1(id int);
OK
Time taken: 0.442 seconds
hive> show tables;
OK
test1
Time taken: 0.031 seconds, Fetched: 1 row(s)
hive> describe extended test1;
OK
id                      int                 
         
Detailed Table Information      Table(tableName:test1, dbName:default, owner:root, createTime:1554317591, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:id, type:int, comment:null)], location:hdfs://node1:8020/hive/warehouse/test1, inputFormat:org.apache.hadoop.mapred.TextInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[], parameters:{transient_lastDdlTime=1554317591}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)
Time taken: 0.126 seconds, Fetched: 3 row(s)
hive>
从上面打印的信息可以看到,hive 的输入和输出调用的类有:  
* inputFormat:org.apache.hadoop.mapred.TextInputFormat,  
* outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat,  

下面我们就来设置 Hive 的输入格式化为我们刚才开发的自定的类,如果设置成功后再次查看 inputFormat会是我们添加的那个类了。

3.1 将源码打包

因为项目是一个 Maven 项目,可以直接使用 Maven 命令打包mvn clean package。如果是在 Idea 则直接点击运行右侧的 Maven Projects --> Lifecycle --> package

3.2 将打好的 jar 包添加到环境的lib库中

将程序打成jar包,放到 Hive 和 Hadoop 的lib库下。如果有多个节点,每个节点下都需要上传一份。

3.3 设置 Hive 加载数据的编码格式、自定义字段分隔符和自定义换行符

添加完 jar 包后需要重新进入 Hive CLI。进入Hive CLI输入如下命令:

//"US-ASCII""ISO-8859-1""UTF-8""UTF-16BE""UTF-16LE""UTF-16"  
set textinputformat.record.encoding=UTF-8;
// 字段间的切分字符
set textinputformat.record.fieldsep=,;
// 行切分字符
set textinputformat.record.linesep=|+|;  
以上命令设置输入文件的编码格式为 UTF-8,字符安间的分割符为英文逗号,行分隔符为 |+|符。

3.4 在本地创建一个文件,测试数据如下

例如 在家目录下创建一个 hive_separator.txt 文件,输入如下测试数据:
3,Yore|+|9,Yuan|+|11,東

3.5 创建一个测试表

其中 INPUTFORMAT为我们自定的 TextInputFormat 类的全限定类名:yore.hive.SQPTextInputFormat.java

create table test  (  
    id string,  
    name string  
)  stored as  
INPUTFORMAT 'yore.hive.SQPTextInputFormat'  
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' ; 

3.6加载测试数据到表中

load data local inpath '/root/hive_separator.txt' 
overwrite into table test;

3.7 查询数据

hive> select * from test;
OK
3       Yore
9       Yuan
11      東

hive-user-defined-separator

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Hive踩坑之The specified datasto.. 下一篇执行Hive语句报错:FAILED: Error..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目