TOP

flink实战--读写Hbase
2019-03-05 01:41:41 】 浏览:724
Tags:flink 实战 读写 Hbase

版权声明:原创文章 欢迎参考 请勿抄袭 https://blog.csdn.net/aA518189/article/details/86544844

简介

在Flink文档中,提供connector读取源数据和把处理结果存储到外部系统中。但是没有提供数据库的connector,如果要读写数据库,官网给出了异步IO(Asynchronous I/O)专门用于访问外部数据,详细可看:

https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/asyncio.html

还有一种方法是继承RichSourceFunction,重写里面的方法,所有的数据库flink都可以通过这两种方式进行数据的读写,这里以hbase为例进行说明。

flink读写Hbase

写入HBase提供两种方式

  1. 第一种:继承RichSourceFunction重写父类方法
  2. 第二种:实现OutputFormat接口

本文主要介绍我们实现OutputFormat接口的具体步骤

实现OutputFormat接口

实现方式:

我们需要自己自定义一个hbase的操作类实现OutputFormat接口,重写里面的抽象方法,也就是下面的抽象方法

public interface OutputFormat<IT> extends Serializable {
	void configure(Configuration parameters);
	void open(int taskNumber, int numTasks) throws IOException;
	void writeRecord(IT record) throws IOException;
	void close() throws IOException;
}

抽象方法说明

configure

configure方法主要用于:配置输出格式。由于输出格式是通用的,因此是无参数的,这个方法是输出格式根据配置值设置基本字段的地方,此方法总是在实例化输出格式上首先调用,但是我们不会这个方法做实际测操作。

open

用于打开输出格式的并行实例,以存储其并行实例的结果,调用此方法时,将确保配置该方法的输出格式。所以在open方法中我们会进行hbase的连接,配置,建表等操作。

writeRecord

用于将数据写入数据源,所以我们会在这个方法中调用写入hbase的API

close

这个不用说了就是关闭数据源的连接

导入依赖

<dependency>
 <groupId>org.apache.hbase</groupId>
 <artifactId>hbase-client</artifactId>
 <version>1.2.4</version>
 </dependency>
<dependency>
 <groupId>org.apache.hbase</groupId>
 <artifactId>hbase-server</artifactId>
 <version>1.2.4</version>
</dependency>

实例

public class HBaseOutputFormat implements OutputFormat<Tuple5<Long, Long, Long, String, Long>> {
    private org.apache.hadoop.conf.Configuration conf = null;
    private Connection conn = null;
    private Table table = null;
    @Override
    public void configure(Configuration parameters) {
    }
    @Override
    public void open(int taskNumber, int numTasks) throws IOException {
        HbaseUtil.setConf("ip1,ip2,ip3", "2181");
        conn = HbaseUtil.connection;
        HbaseUtil.createTable("flink_test2","info");
    }
    @Override
    public void writeRecord(Tuple5<Long, Long, Long, String, Long> record) throws IOException {
        Put put = new Put(Bytes.toBytes(record.f0+record.f4));
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("uerid"), Bytes.toBytes(record.f0));
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("behavior"), Bytes.toBytes(record.f3));
        ArrayList<Put> putList = new ArrayList<>();
        putList.add(put);
        //设置缓存1m,当达到1m时数据会自动刷到hbase
        BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf("flink_test2"));
        params.writeBufferSize(1024 * 1024); //设置缓存的大小
        BufferedMutator mutator = conn.getBufferedMutator(params);
        mutator.mutate(putList);
        mutator.flush();
        putList.clear();
    }
    public void close() throws IOException {
        if (table != null) {
            table.close();
        }
        if (conn != null) {
            conn.close();
        }
    }
}

如何使用

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(5000)
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  val dataStream =  env.addSource()
...........
  dataStream.writeUsingOutputFormat(new HBaseOutputFormat());//写入Hbase
--------------------- 

扫一扫加入大数据技术交流群,了解更多大数据技术,还有免费资料等你哦

扫一扫加入大数据技术交流群,了解更多大数据技术,还有免费资料等你哦

扫一扫加入大数据技术交流群,了解更多大数据技术,还有免费资料等你哦


flink实战--读写Hbase https://www.cppentry.com/bencandy.php?fid=118&id=211668

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇HBase的Scan实现源码分析 下一篇图解Nosql(HBase)与传统数据库..