设为首页 加入收藏

TOP

hbase 使用 (快速多维检索)-准实时搜索
2019-02-09 01:54:14 】 浏览:16
Tags:hbase 使用 快速 多维 检索 实时 搜索

1. 创建表(./bin/hbase shell)

create 't_user', {NAME => 'info'}
describe 't_user'

2. 生成数据(1000w行)

2.1 generate in hadoop file

import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.Random;
import org.apache.hadoop.fs.FSDataOutputStream;

public class UserDataSimulation {
	public void simulate(String name, long count) throws IOException {
		HdfsUtil hdfs = new HdfsUtil();
		String uri = "hdfs://ip:9000/" + name;
		hdfs.deleteHdfsFile(uri);
		hdfs.createHdfsFile(uri);
		FSDataOutputStream os = hdfs.getOutputStream(uri);
		Writer hdfsOut = new OutputStreamWriter(os, "utf-8");
		StringBuffer sb = new StringBuffer();

		long simCount = count;
		int sbLine = 0;
		String userid;
		String username;
		String transId;
		String mPointId;
		Random rand = new Random(50);

		for (int i = 0; i < simCount; i++) {
			sbLine++;
			userid = "id" + String.format("%010d", i);
			username = "name" + String.format("%010d", i);
			transId = "transId" + String.format("%010d", rand.nextInt(50));
			mPointId = "pointId" + String.format("%010d", i);
			sb.append(userid).append("\t").append(username).append("\t")
					.append(transId).append("\t").append(mPointId)
					.append("\r\n");

			if (sbLine == 20000) {
				hdfsOut.write(sb.toString());
				sb.setLength(0);
				sbLine = 0;
				System.out.println("LineNum:=" + i);
			}
		}
		hdfsOut.write(sb.toString());
		hdfsOut.close();
		os.close();
		System.out.println("finished, total LineNum:=" + simCount);
	}

	public static void main(String[] args) throws IOException {
		UserDataSimulation ds = new UserDataSimulation();
		ds.simulate(args[0], Long.parseLong(args[1]));
	}
}

2.2 设定hbase hadoop jar

cp /opt/hbasedir/lib/hbase*.jar /opt/hadoopdir/share/hadoop/common/lib/

2.3 编写map reduce 导入程序

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;


public class SampleUploader {
 
 static Configuration conf = HBaseConfiguration.create();
 static HTablePool pool = new HTablePool(conf, 3);
 static HTableInterface table = pool.getTable("t_user");

 private static final String NAME = "SampleUploader";
 static class Uploader extends
   Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

  private long checkpoint = 10000;
  private long count = 0;
  

  @SuppressWarnings("deprecation")
  @Override
  public void map(LongWritable key, Text line, Context context)
    throws IOException {

   String[] values = line.toString().split("\t");
   if (values.length != 4) {
    return;
   }
   
   // Extract each value
   byte[] row = Bytes.toBytes(values[0]);
   byte[] username = Bytes.toBytes(values[1]);
   byte[] transid = Bytes.toBytes(values[2]);
   byte[] pointid = Bytes.toBytes(values[3]);

   // Create Put
   Put put = new Put(row);
   put.add(Bytes.toBytes("info"), Bytes.toBytes("username"), username);
   put.add(Bytes.toBytes("info"), Bytes.toBytes("transid"), transid);
   put.add(Bytes.toBytes("info"), Bytes.toBytes("pointid"), pointid);

   table.put(put);
   count++;
   if(count==1000)
   {
    table.flushCommits();
    count=0;
   }
  }
 }

 /**
  * Job configuration.
  */
 public static Job configureJob(Configuration conf, String[] args)
   throws IOException {
  Path inputPath = new Path(args[0]);
  String tableName = args[1];
  Job job = new Job(conf, NAME + "_" + tableName);
  job.setJarByClass(Uploader.class);
  FileInputFormat.setInputPaths(job, inputPath);
  
  job.setInputFormatClass(TextInputFormat.class);
  job.setMapperClass(Uploader.class);
  
  HdfsUtil hdfs=new HdfsUtil();
  hdfs.deleteHdfsFile("hdfs://ip:9000/test/mrout");
  
  FileOutputFormat.setOutputPath(job, new Path("/test/mrout"));
  
  job.setNumReduceTasks(0);
  return job;
 }

 /**
  * Main entry point.
  * 
  * @param args
  * The command line parameters.
  * @throws Exception
  * When running the job fails.
  */
 public static void main(String[] args) throws Exception {
  JobConf conf = new JobConf();
  conf.setNumMapTasks(10);
  String[] otherArgs = new GenericOptionsParser(conf, args)
    .getRemainingArgs();
  if (otherArgs.length != 2) {
   System.err
     .println("Wrong number of arguments: " + otherArgs.length);
   System.err.println("Usage: " + NAME + " <input> <tablename>");
   System.exit(-1);
  }
  Job job = configureJob(conf, otherArgs);
  System.exit(job.waitForCompletion(true)  0 : 1);
  pool.putTable(table);
 }
}


为了保证运行参数的设定,把hbase-site.xml文件放到 src目录下,并打入到jar包中

2.4 创建表生成coprocessor : ./hbase shell

disable 't_user'
drop 't_user'
create 't_user', 'info'
alter 't_user', METHOD => 'table_att','coprocessor'=>'hdfs://ip:9000/test/hbasedemo.jar|power.MyCP||'
put 't_user' , 'a111' ,'info:transid','tbbb'
describe 't_user'

hadoop fs -rm -r /test/xxx.jar
hadoop fs -copyFromLocal /opt/jar/xxx.jar /test


coprocess as follows:

public class MyCP extends BaseRegionObserver {

	@Override
	public void postPut(ObserverContext<RegionCoprocessorEnvironment> e,
			Put put, WALEdit edit, Durability durability) throws IOException {
		super.postPut(e, put, edit, durability);  
		byte[] rowKey = put.getRow();
		String rowKeyStr = new String(rowKey);
		List<Cell> lstCell=put.get(Bytes.toBytes("info"), Bytes.toBytes("transid"));
		String val="";
		for(Cell c:lstCell)
		{
			val=Bytes.toString(c.getValue());
		}
		SolrTest solr=new SolrTest();
		solr.test(rowKeyStr, val);
	}
}

2.5 start solr 4.8

java -jar ./example/start.jar


2.6 在集群上运行 hadoop jar xxx.jar


2.7 open ie to search: http://ip:8983/solr


备注:

100w数据的条件查询,10ms内。







编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇HBase修改表TTL值 下一篇Hbase 列族设计

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }