设为首页 加入收藏

TOP

如何将hbase的数据转移到MySQL;
2018-12-17 01:46:22 】 浏览:111
Tags:如何 hbase 数据 转移 MySQL
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/zjq777/article/details/79950871

import com.bing.tools.Constant;
import java.io.File;
import java.io.FileInputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;

/**
*
* @author zjq
*/

public static void main(String[] args) {
try {
Class.forName("com.mysql.jdbc.Driver");
conn=DriverManager.getConnection

("jdbc:mysql://hostname/BDdata zeroDateTimeBehavior=convertToNull&characterEncodin g=utf8&allowMultiQueries=true", "root", "123");
conn.setAutoCommit(false);
run();
} catch (Exception e) {
e.printStackTrace();
}

}


public class Test01 {

protected static Connection conn;

public void run() {
try {
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "Fnb:dianpu");
table.setAutoFlush(false);
SingleColumnValueFilter scvf = new SingleColumnValueFilter(Bytes.toBytes("58"), Bytes.toBytes("date"),
CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("更新于2018-3-15")));
scvf.setLatestVersionOnly(true);
scvf.setFilterIfMissing(true);
FilterList filterlist = new FilterList();
filterlist.addFilter(scvf);
Scan s = new Scan();
FilterList f = new FilterList(FilterList.Operator.MUST_PASS_ALL, filterlist);
s.setFilter(f);
ResultScanner rs = table.getScanner(s);

List<HashMap<String, String>> list = new ArrayList<>();
int count = 0;
for (Result res : rs) {
String sql = "insert into fbd_store (from_web,title,date,total_price,total_price,rent_price,acreage,store_type + ",width,store_status,depth,business,height,address,agent_company,agent_name,agent_tell,peitao,intro,pic_urls,floor) "
+ " values(,,,,,,,,,,,,,,,,,,,,)";
PreparedStatement pstmt = conn.prepareStatement(sql);
pstmt.setString(1, Bytes.toString(res.getValue(Bytes.toBytes("58"), Bytes.toBytes("from_web"))));
pstmt.setString(2, Bytes.toString(res.getValue(Bytes.toBytes("58"), Bytes.toBytes("title"))));
pstmt.setString(3, Bytes.toString(res.getValue(Bytes.toBytes("58"), Bytes.toBytes("date"))).replaceAll("更新于", ""));
pstmt.setString(4, Bytes.toString(res.getValue(Bytes.toBytes("58"), Bytes.toBytes("total_price"))));
pstmt.setString(5, Bytes.toString(res.getValue(Bytes.toBytes("58"), Bytes.toBytes("price"))));
pstmt.setString(6, Bytes.toString(res.getValue(Bytes.toBytes("58"), Bytes.toBytes("rent_price"))));
pstmt.setString(7, Bytes.toString(res.getValue(Bytes.toBytes("58"), Bytes.toBytes("acreage"))));
pstmt.setString(8, Bytes.toString(res.getValue(Bytes.toBytes("58"), Bytes.toBytes("store_type"))));
pstmt.setString(9, Bytes.toString(res.getValue(Bytes.toBytes("58"), Bytes.toBytes("width"))));
pstmt.setString(10, Bytes.toString(res.getValue(Bytes.toBytes("58"), Bytes.toBytes("store_status"))));
pstmt.setString(11, Bytes.toString(res.getValue(Bytes.toBytes("58"), Bytes.toBytes("depth"))));
pstmt.setString(12, Bytes.toString(res.getValue(Bytes.toBytes("58"), Bytes.toBytes("business"))));
pstmt.setString(13, Bytes.toString(res.getValue(Bytes.toBytes("58"), Bytes.toBytes("height"))));
pstmt.setString(14, Bytes.toString(res.getValue(Bytes.toBytes("58"), Bytes.toBytes("address"))));
pstmt.setString(15, Bytes.toString(res.getValue(Bytes.toBytes("58"), Bytes.toBytes("agent_company"))));
pstmt.setString(16, Bytes.toString(res.getValue(Bytes.toBytes("58"), Bytes.toBytes("agent_name"))));
pstmt.setString(17, Bytes.toString(res.getValue(Bytes.toBytes("58"), Bytes.toBytes("agent_tell"))));
pstmt.setString(18, Bytes.toString(res.getValue(Bytes.toBytes("58"), Bytes.toBytes("peitao"))));
pstmt.setString(19, Bytes.toString(res.getValue(Bytes.toBytes("58"), Bytes.toBytes("intro"))));
pstmt.setString(20, Bytes.toString(res.getValue(Bytes.toBytes("58"), Bytes.toBytes("floor"))));
String[] urls = Bytes.toString((res.getValue(Bytes.toBytes("58"), Bytes.toBytes("pic_urls")))).split(",");
for (int i = 0; i < urls.length; i++) {
save_pic(urls[i], Bytes.toString(res.getValue(Bytes.toBytes("58"), Bytes.toBytes("date"))).replaceAll("更新于", ""));

}
String rowkey = Bytes.toString(res.getRow());
String key = Bytes.toString(res.getValue(Bytes.toBytes("58"), Bytes.toBytes("date")));
System.out.println(rowkey + " " + key);
count++;
}
} catch (Exception e) {
e.printStackTrace();
}
}

//由于hbase里存图片是(String,String...)形式

//

public void save_pic(String id, String date) {
try {
String address = Constant.Image_Path_Win + File.separator + id + ".jpg";
File file = new File(address);
FileInputStream fis = new FileInputStream(file);
String sql = "insert into pic_info(id,img,date) values(,,)";
PreparedStatement ps = conn.prepareStatement(sql);
ps.setLong(1, Long.valueOf(id));
ps.setBinaryStream(2, fis, file.length());
ps.setString(3, date);
ps.executeUpdate();
ps.close();
conn.commit();
} catch (Exception e) {
e.printStackTrace();
}
}
}

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇大数据开发面试:HBase的模式Sche.. 下一篇mysql数据导入hbase

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目