版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u010666884/article/details/51722275
HBase and MapReduce
Now that you have an understanding of MapReduce and HBase in distributed mode,
let’s look at them together. There are three different ways of interacting with HBase
from a MapReduce application. HBase can be used as a
data source at the beginning of
a job, as a data sink at the end of a job, or as a
shared resource for your tasks. None of
these modes of interaction are particularly mysterious. The third, however, has some
interesting use cases we’ll address shortly.
All the code snippets used in this section are examples of using the Hadoop
MapReduce API. There are no HBase client HTable or HTablePool instances involved.
Those are embedded in the special input and output formats you’ll use here. You will,
however, use the Put, Delete, and Scan objects with which you’re already familiar.
Creating and configuring the Hadoop Job and Configuration instances can be messy
work. These snippets emphasize the HBase portion of that work. You’ll see a full working
example in section 3.5.
HBase as a source
In the example MapReduce application, you read lines from log files sitting in the
HDFS. Those files, specifically the directory in HDFS containing those files, act as the
data source for the MapReduce job. The schema of that data source describes
[k1,v1] tuples as [line number:line]. The TextInputFormat class configured as
part of the job defines this schema. The relevant code from the TimeSpent example
looks like this:
Configuration conf = new Configuration();
Job job = new Job(conf, "TimeSpent");
...
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextInputFormat defines the [k1,v1] type for line number and line as the types
LongWritable and Text, respectively. LongWritable and Text are serializable Hadoop
wrapper types over Java’s Long and String. The associated map task definition is typed
for consuming these input pairs:
public void map(LongWritable key, Text value,
Context context) {
...
}
HBase provides similar classes for consuming data out of a table. When mapping over
data in HBase, you use the same Scan class you used before. Under the hood, the rowrange
defined by the Scan is broken into pieces and distributed to all the workers (figure
3.12).
This is identical to the splitting you saw in figure 3.1. Creating a Scan instance for
scanning over all rows in a table from MapReduce looks like this:
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("twits"), Bytes.toBytes("twit"));
In this case, you’re asking the scanner to return only the text from the twits table.
Just like consuming text lines, consuming HBase rows requires a schema. All jobs
reading from an HBase table accept their [k1,v1] pairs in the form of [rowkey:scan
result]. That’s the same scanner result as when you consume the regular HBase API.
They’re presented using the types ImmutableBytesWritable and Result. The provided
TableMapper wraps up these details for you, so you’ll want to use it as the base
class for your Map Step implementation:
protected void map(
ImmutableBytesWritable rowkey,
Result result,
Context context) {
...
}
The next step is to take your Scan instance and wire it into MapReduce. HBase provides
the handy TableMapReduceUtil class to help you initialize the Job instance:
TableMapReduceUtil.initTableMapperJob(
"twits",
scan,
Map.class,
ImmutableBytesWritable.class,
Result.class,
job);
This takes your job-configuration object and sets up the HBase-specific input format
(TableInputFormat). It then configures MapReduce to read from the table using
your Scan instance. It also wires in your Map and Reduce class implementations. From
here, you write and run the MapReduce application as normal.
When you run a MapReduce job as described here, one map task is launched for
every region in the HBase table. In other words, the map tasks are partitioned such that
each map task reads from a region independently. The JobTracker tries to schedule
map tasks as close to the regions as possibly and take advantage of data locality.
HBase as a sink
Writing to an HBase table from MapReduce (figure 3.13) as a data sink is similar to
reading from a table in terms of implementation.
HBase provides similar tooling to simplify the configuration. Let’s first make an
example of sink configuration in a standard MapReduce application.
In TimeSpent, the values of [k3,v3] generated by the aggregators are
[UserID:TotalTime]. In the MapReduce application, they’re of the Hadoop serializable
types Text and LongWritable, respectively. Configuring output types is similar to
configuring input types, with the exception that the [k3,v3] output types can’t be
inferred by the OutputFormat:
Configuration conf = new Configuration();
Job job = new Job(conf, "TimeSpent");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
...
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
In this case, no line numbers are specified. Instead, the TextOuputFormat schema creates
a tab-separated output file containing first the UserID and then the TotalTime.
What’s written to disk is the String representation of both types.
The Context object contains the type information. The reduce function is defined as
public void reduce(Text key, Iterable<LongWritable> values,
Context context) {
...
}
When writing to HBase from MapReduce, you’re again using the regular HBase API.
The types of [k3,v3] are assumed to be a rowkey and an object for manipulating
HBase. That means the values of v3 must be either Puts or Deletes. Because both of
these object types include the relevant rowkey, the value of k3 is ignored. Just as the
TableMapper wraps up these details for you, so does the TableReducer:
protected void reduce(
ImmutableBytesWritable rowkey,
Iterable<Put> values,
Context context) {
...
}
The last step is wiring your reducer into the job configuration. You need to specify
the destination table along with all the appropriate types. Once again, it’s
TableMapReduceUtil to the rescue; it sets up the TableOutputFormat for you! You
use IdentityTableReducer, a provided class, because you don’t need to perform any
computation in the Reduce Step:
TableMapReduceUtil.initTableReducerJob(
"users",
IdentityTableReducer.class,
job);
Now your job is completely wired up, and you can proceed as normal. Unlike the case
where map tasks are reading from HBase, tasks don’t necessarily write to a single region.
The writes go to the region that is responsible for the rowkey that is being written by the
reduce task. The default partitioner that assigns the intermediate keys to the reduce
tasks doesn’t have knowledge of the regions and the nodes that are hosting them and
therefore can’t intelligently assign work to the reducers such that they write to the local
regions. Moreover, depending on the logic you write in the reduce task, which doesn’t
have to be the identity reducer, you might end up writing all over the table.
HBase as a shared resource
Reading from and writing to HBase using MapReduce is handy. It gives us a harness
for batch processing over data in HBase. A few predefined MapReduce jobs ship with
HBase; you can explore their source for more examples of using HBase from Map-
Reduce. But what else can you do with HBase
One common use of HBase is to support a large map-side join. In this scenario,
you’re reading from HBase as an indexed resource accessible from all map tasks. What
is a map-side join, you ask How does HBase support it Excellent questions!
Let’s back up a little. A join is common practice in data manipulation. Joining two
tables is a fundamental concept in relational databases. The idea behind a join is to
combine records from the two different sets based on like values in a common attribute.
That attribute is often called the join key.
For example, think back to the TimeSpent MapReduce job. It produces a dataset
containing a UserID and the TotalTime they spent on the TwitBase site:
You’d like to know the ratio of how much time a user spends on the site to their total
twit count. Although this is an easy question to answer, right now the relevant data is
split between two different datasets. You’d like to join this data such that all the information
about a user is in a single row. These two datasets share a common attribute:
UserID. This will be the join key. The result of performing the join and dropping
unused fields looks like this:
UserID TwitCount TimeSpent
Yvonn66 48 30s
Mario23 56 2s
Rober4 2 6s
Masan46 47 35s
Joins in the relational world are a lot easier than in MapReduce. Relational engines
enjoy many years of research and tuning around performing joins. Features like
indexing help optimize join operations. Moreover, the data typically resides on a single
physical server. Joining across multiple relational servers is far more complicated
and isn’t common in practice. A join in MapReduce means joining on data spread
across multiple servers. But the semantics of the MapReduce framework make it easier
than trying to do a join across different relational database systems. There are a couple
of different variations of each type, but a join implementation is either
map-side or
reduce-side. They’re referred as map- or reduce-side because that’s the task where
records from the two sets are linked. Reduce-side joins are more common because
they’re easier to implement. We’ll describe those first.
REDUCE-SIDE JOIN
A reduce-side join takes advantage of the intermediate Shuffle Step to collocate relevant
records from the two sets. The idea is to map over both sets and emit tuples keyed
on the join key. Once together, the reducer can produce all combinations of values.
Let’s build out the algorithm.
Given the sample data, the pseudo-code of the map task for consuming the
TimeSpent data looks like this:
map_timespent(line_num, line):
userid, timespent = split(line)
record = {"TimeSpent" : timespent,
"type" : "TimeSpent"}
emit(userid, record)
This map task splits the k1 input line into the UserID and TimeSpent values. It then
constructs a dictionary with type and TimeSpent attributes. As [k2,v2] output, it produces
[UserID:dictionary].
A map task for consuming the Users data is similar. The only difference is that it
drops a couple of unrelated fields:
map_users(line_num, line):
userid, name, email, twitcount = split(line)
record = {"TwitCount" : twitcount,
"type" : "TwitCount"}
emit(userid, record)
Both map tasks use UserID as the value for k2. This allows Hadoop to group all records
for the same user. The reduce task has everything it needs to complete the join:
reduce(userid, records):
timespent_recs = []
twitcount_recs = []
for rec in records:
if rec.type == "TimeSpent":
rec.del("type")
timespent_recs.push(rec)
else:
rec.del("type")
twitcount_recs.push(rec)
for timespent in timespent_recs:
for twitcount in twitcount_recs:
emit(userid, merge(timespent, twitcount))
The reduce task groups records of identical type and produces all possible combinations
of the two types as k3. For this specific example, you know there will be only one
record of each type, so you can simplify the logic. You also can fold in the work of producing
the ratio you want to calculate:
reduce(userid, records):
for rec in records:
rec.del("type")
merge(records)
emit(userid, ratio(rec["TimeSpent"], rec["TwitCount"]))
This new and improved reduce task produces the new, joined dataset:
UserID ratio
Yvonn66 30s:48
Mario23 2s:56
Rober4 6s:2
Masan46 35s:47
There you have it: the reduce-side join in its most basic glory. One big problem with
the reduce-side join is that it requires all [k2,v2] tuples to be shuffled and sorted. For
our toy example, that’s no big deal. But if the datasets are very, very large, with millions
of pairs per value of k2, the overhead of that step can be huge.
Reduce-side joins require shuffling and sorting data between map and reduce tasks.
This incurs I/O costs, specifically network, which happens to be the slowest aspect of
any distributed system. Minimizing this network I/O will improve join performance.
This is where the map-side join can help.
MAP-SIDE JOIN
The map-side join is a technique that isn’t as general-purpose as the reduce-side join.
It assumes the map tasks can look up random values from one dataset while they iterate
over the other. If you happen to want to join two datasets where at least one of them
can fit in memory of the map task, the problem is solved: load the smaller dataset into
a hash-table so the map tasks can access it while iterating over the other dataset. In
these cases, you can skip the Shuffle and Reduce Steps entirely and emit your final
output from the Map Step. Let’s go back to the same example. This time you’ll put the
Users dataset into memory. The new map_timespent task looks like this:
map_timespent(line_num, line):
users_recs = read_timespent("/path/to/users.csv")
userid, timespent = split(line)
record = {"TimeSpent" : timespent}
record = merge(record, users_recs[userid])
emit(userid, ratio(record["TimeSpent"], record["TwitCount"]))
Compared to the last version, this looks like cheating! Remember, though, you can
only get away with this approach when you can fit one of the datasets entirely into
memory. In this case, your join will be much faster.
There are of course implications to doing joins like this. For instance, each map task
is processing a single split, which is equal to one HDFS block (typically 64–128 MB), but
the join dataset that it loads into memory is 1 GB. Now, 1 GB can certainly fit in memory,
but the cost involved in creating a hash-table for a 1 GB dataset for every 128 MB of data
being joined makes it not such a good idea.
MAP-SIDE JOIN WITH HBASE
Where does HBase come in We originally described HBase as a giant hash-table, remember
Look again at the map-side join implementation. Replace users_recs with the Users
table in TwitBase. Now you can join over the massive Users table and massive TimeSpent
data set in record time! The map-side join using HBase looks like this:
map_timespent(line_num, line):
users_table = HBase.connect("Users")
userid, timespent = split(line)
record = {"TimeSpent" : timespent}
record = merge(record, users_table.get(userid, "info:twitcount"))
emit(userid, ratio(record["TimeSpent"], record["info:twitcount"]))
Think of this as an external hash-table that each map task has access to. You don’t need
to create that hash-table object for every task. You also avoid all the network I/O
involved in the Shuffle Step necessary for a reduce-side join.
There’s a lot more to distributed joins than we’ve covered in this section. They’re so
common that Hadoop ships with a contrib JAR called hadoop-datajoin to make
things easier. You should now have enough context to make good use of it and also
take advantage of HBase for other MapReduce optimizations.