设为首页 加入收藏

TOP

Hbase 学习笔记二 》HBase and MapReduce
2018-11-13 15:39:01 】 浏览:40
Tags:Hbase 学习 笔记 HBase and MapReduce
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u010666884/article/details/51722275

HBase and MapReduce

Now that you have an understanding of MapReduce and HBase in distributed mode,

let’s look at them together. There are three different ways of interacting with HBase

from a MapReduce application. HBase can be used as a data source at the beginning of

a job, as a data sink at the end of a job, or as a shared resource for your tasks. None of

these modes of interaction are particularly mysterious. The third, however, has some

interesting use cases we’ll address shortly.

All the code snippets used in this section are examples of using the Hadoop

MapReduce API. There are no HBase client HTable or HTablePool instances involved.

Those are embedded in the special input and output formats you’ll use here. You will,

however, use the Put, Delete, and Scan objects with which you’re already familiar.

Creating and configuring the Hadoop Job and Configuration instances can be messy

work. These snippets emphasize the HBase portion of that work. You’ll see a full working

example in section 3.5.

HBase as a source

In the example MapReduce application, you read lines from log files sitting in the

HDFS. Those files, specifically the directory in HDFS containing those files, act as the

data source for the MapReduce job. The schema of that data source describes

[k1,v1] tuples as [line number:line]. The TextInputFormat class configured as

part of the job defines this schema. The relevant code from the TimeSpent example

looks like this:

Configuration conf = new Configuration();

Job job = new Job(conf, "TimeSpent");

...

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

TextInputFormat defines the [k1,v1] type for line number and line as the types

LongWritable and Text, respectively. LongWritable and Text are serializable Hadoop

wrapper types over Java’s Long and String. The associated map task definition is typed

for consuming these input pairs:

public void map(LongWritable key, Text value,

Context context) {

...

}

HBase provides similar classes for consuming data out of a table. When mapping over

data in HBase, you use the same Scan class you used before. Under the hood, the rowrange

defined by the Scan is broken into pieces and distributed to all the workers (figure

3.12).

This is identical to the splitting you saw in figure 3.1. Creating a Scan instance for

scanning over all rows in a table from MapReduce looks like this:

Scan scan = new Scan();

scan.addColumn(Bytes.toBytes("twits"), Bytes.toBytes("twit"));

In this case, you’re asking the scanner to return only the text from the twits table.

Just like consuming text lines, consuming HBase rows requires a schema. All jobs

reading from an HBase table accept their [k1,v1] pairs in the form of [rowkey:scan

result]. That’s the same scanner result as when you consume the regular HBase API.

They’re presented using the types ImmutableBytesWritable and Result. The provided

TableMapper wraps up these details for you, so you’ll want to use it as the base

class for your Map Step implementation:

protected void map(

ImmutableBytesWritable rowkey,

Result result,

Context context) {

...

}

The next step is to take your Scan instance and wire it into MapReduce. HBase provides

the handy TableMapReduceUtil class to help you initialize the Job instance:

TableMapReduceUtil.initTableMapperJob(

"twits",

scan,

Map.class,

ImmutableBytesWritable.class,

Result.class,

job);

This takes your job-configuration object and sets up the HBase-specific input format

(TableInputFormat). It then configures MapReduce to read from the table using

your Scan instance. It also wires in your Map and Reduce class implementations. From

here, you write and run the MapReduce application as normal.

When you run a MapReduce job as described here, one map task is launched for

every region in the HBase table. In other words, the map tasks are partitioned such that

each map task reads from a region independently. The JobTracker tries to schedule

map tasks as close to the regions as possibly and take advantage of data locality.

HBase as a sink

Writing to an HBase table from MapReduce (figure 3.13) as a data sink is similar to

reading from a table in terms of implementation.

HBase provides similar tooling to simplify the configuration. Let’s first make an

example of sink configuration in a standard MapReduce application.

In TimeSpent, the values of [k3,v3] generated by the aggregators are

[UserID:TotalTime]. In the MapReduce application, they’re of the Hadoop serializable

types Text and LongWritable, respectively. Configuring output types is similar to

configuring input types, with the exception that the [k3,v3] output types can’t be

inferred by the OutputFormat:

Configuration conf = new Configuration();

Job job = new Job(conf, "TimeSpent");

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(LongWritable.class);

...

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

In this case, no line numbers are specified. Instead, the TextOuputFormat schema creates

a tab-separated output file containing first the UserID and then the TotalTime.

What’s written to disk is the String representation of both types.

The Context object contains the type information. The reduce function is defined as

public void reduce(Text key, Iterable<LongWritable> values,

Context context) {

...

}

When writing to HBase from MapReduce, you’re again using the regular HBase API.

The types of [k3,v3] are assumed to be a rowkey and an object for manipulating

HBase. That means the values of v3 must be either Puts or Deletes. Because both of

these object types include the relevant rowkey, the value of k3 is ignored. Just as the

TableMapper wraps up these details for you, so does the TableReducer:

protected void reduce(

ImmutableBytesWritable rowkey,

Iterable<Put> values,

Context context) {

...

}

The last step is wiring your reducer into the job configuration. You need to specify

the destination table along with all the appropriate types. Once again, it’s

TableMapReduceUtil to the rescue; it sets up the TableOutputFormat for you! You

use IdentityTableReducer, a provided class, because you don’t need to perform any

computation in the Reduce Step:

TableMapReduceUtil.initTableReducerJob(

"users",

IdentityTableReducer.class,

job);

Now your job is completely wired up, and you can proceed as normal. Unlike the case

where map tasks are reading from HBase, tasks don’t necessarily write to a single region.

The writes go to the region that is responsible for the rowkey that is being written by the

reduce task. The default partitioner that assigns the intermediate keys to the reduce

tasks doesn’t have knowledge of the regions and the nodes that are hosting them and

therefore can’t intelligently assign work to the reducers such that they write to the local

regions. Moreover, depending on the logic you write in the reduce task, which doesn’t

have to be the identity reducer, you might end up writing all over the table.

HBase as a shared resource

Reading from and writing to HBase using MapReduce is handy. It gives us a harness

for batch processing over data in HBase. A few predefined MapReduce jobs ship with

HBase; you can explore their source for more examples of using HBase from Map-

Reduce. But what else can you do with HBase

One common use of HBase is to support a large map-side join. In this scenario,

you’re reading from HBase as an indexed resource accessible from all map tasks. What

is a map-side join, you ask How does HBase support it Excellent questions!

Let’s back up a little. A join is common practice in data manipulation. Joining two

tables is a fundamental concept in relational databases. The idea behind a join is to

combine records from the two different sets based on like values in a common attribute.

That attribute is often called the join key.

For example, think back to the TimeSpent MapReduce job. It produces a dataset

containing a UserID and the TotalTime they spent on the TwitBase site:

You’d like to know the ratio of how much time a user spends on the site to their total

twit count. Although this is an easy question to answer, right now the relevant data is

split between two different datasets. You’d like to join this data such that all the information

about a user is in a single row. These two datasets share a common attribute:

UserID. This will be the join key. The result of performing the join and dropping

unused fields looks like this:

UserID TwitCount TimeSpent

Yvonn66 48 30s

Mario23 56 2s

Rober4 2 6s

Masan46 47 35s

Joins in the relational world are a lot easier than in MapReduce. Relational engines

enjoy many years of research and tuning around performing joins. Features like

indexing help optimize join operations. Moreover, the data typically resides on a single

physical server. Joining across multiple relational servers is far more complicated

and isn’t common in practice. A join in MapReduce means joining on data spread

across multiple servers. But the semantics of the MapReduce framework make it easier

than trying to do a join across different relational database systems. There are a couple

of different variations of each type, but a join implementation is either map-side or

reduce-side. They’re referred as map- or reduce-side because that’s the task where

records from the two sets are linked. Reduce-side joins are more common because

they’re easier to implement. We’ll describe those first.

REDUCE-SIDE JOIN

A reduce-side join takes advantage of the intermediate Shuffle Step to collocate relevant

records from the two sets. The idea is to map over both sets and emit tuples keyed

on the join key. Once together, the reducer can produce all combinations of values.

Let’s build out the algorithm.

Given the sample data, the pseudo-code of the map task for consuming the

TimeSpent data looks like this:

map_timespent(line_num, line):

userid, timespent = split(line)

record = {"TimeSpent" : timespent,

"type" : "TimeSpent"}

emit(userid, record)

This map task splits the k1 input line into the UserID and TimeSpent values. It then

constructs a dictionary with type and TimeSpent attributes. As [k2,v2] output, it produces

[UserID:dictionary].

A map task for consuming the Users data is similar. The only difference is that it

drops a couple of unrelated fields:

map_users(line_num, line):

userid, name, email, twitcount = split(line)

record = {"TwitCount" : twitcount,

"type" : "TwitCount"}

emit(userid, record)

Both map tasks use UserID as the value for k2. This allows Hadoop to group all records

for the same user. The reduce task has everything it needs to complete the join:

reduce(userid, records):

timespent_recs = []

twitcount_recs = []

for rec in records:

if rec.type == "TimeSpent":

rec.del("type")

timespent_recs.push(rec)

else:

rec.del("type")

twitcount_recs.push(rec)

for timespent in timespent_recs:

for twitcount in twitcount_recs:

emit(userid, merge(timespent, twitcount))

The reduce task groups records of identical type and produces all possible combinations

of the two types as k3. For this specific example, you know there will be only one

record of each type, so you can simplify the logic. You also can fold in the work of producing

the ratio you want to calculate:

reduce(userid, records):

for rec in records:

rec.del("type")

merge(records)

emit(userid, ratio(rec["TimeSpent"], rec["TwitCount"]))

This new and improved reduce task produces the new, joined dataset:

UserID ratio

Yvonn66 30s:48

Mario23 2s:56

Rober4 6s:2

Masan46 35s:47

There you have it: the reduce-side join in its most basic glory. One big problem with

the reduce-side join is that it requires all [k2,v2] tuples to be shuffled and sorted. For

our toy example, that’s no big deal. But if the datasets are very, very large, with millions

of pairs per value of k2, the overhead of that step can be huge.

Reduce-side joins require shuffling and sorting data between map and reduce tasks.

This incurs I/O costs, specifically network, which happens to be the slowest aspect of

any distributed system. Minimizing this network I/O will improve join performance.

This is where the map-side join can help.

MAP-SIDE JOIN

The map-side join is a technique that isn’t as general-purpose as the reduce-side join.

It assumes the map tasks can look up random values from one dataset while they iterate

over the other. If you happen to want to join two datasets where at least one of them

can fit in memory of the map task, the problem is solved: load the smaller dataset into

a hash-table so the map tasks can access it while iterating over the other dataset. In

these cases, you can skip the Shuffle and Reduce Steps entirely and emit your final

output from the Map Step. Let’s go back to the same example. This time you’ll put the

Users dataset into memory. The new map_timespent task looks like this:

map_timespent(line_num, line):

users_recs = read_timespent("/path/to/users.csv")

userid, timespent = split(line)

record = {"TimeSpent" : timespent}

record = merge(record, users_recs[userid])

emit(userid, ratio(record["TimeSpent"], record["TwitCount"]))

Compared to the last version, this looks like cheating! Remember, though, you can

only get away with this approach when you can fit one of the datasets entirely into

memory. In this case, your join will be much faster.

There are of course implications to doing joins like this. For instance, each map task

is processing a single split, which is equal to one HDFS block (typically 64–128 MB), but

the join dataset that it loads into memory is 1 GB. Now, 1 GB can certainly fit in memory,

but the cost involved in creating a hash-table for a 1 GB dataset for every 128 MB of data

being joined makes it not such a good idea.

MAP-SIDE JOIN WITH HBASE

Where does HBase come in We originally described HBase as a giant hash-table, remember

Look again at the map-side join implementation. Replace users_recs with the Users

table in TwitBase. Now you can join over the massive Users table and massive TimeSpent

data set in record time! The map-side join using HBase looks like this:

map_timespent(line_num, line):

users_table = HBase.connect("Users")

userid, timespent = split(line)

record = {"TimeSpent" : timespent}

record = merge(record, users_table.get(userid, "info:twitcount"))

emit(userid, ratio(record["TimeSpent"], record["info:twitcount"]))

Think of this as an external hash-table that each map task has access to. You don’t need

to create that hash-table object for every task. You also avoid all the network I/O

involved in the Shuffle Step necessary for a reduce-side join.

There’s a lot more to distributed joins than we’ve covered in this section. They’re so

common that Hadoop ships with a contrib JAR called hadoop-datajoin to make

things easier. You should now have enough context to make good use of it and also

take advantage of HBase for other MapReduce optimizations.

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇HBase基础知识点 下一篇hive over hbase方式将微博用户数..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目