设为首页 加入收藏

TOP

利用mapreduce计算框架向hbase插入数据(python脚本)
2018-11-28 17:35:58 】 浏览:20
Tags:利用 mapreduce 计算 框架 hbase 插入 数据 python 脚本
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/smithliang1996/article/details/78510261

mapreduce计算框架是hadoop项目中的一个分布式计算框架,他的强大的吞吐能力和批量的数据输出使之成为离线数据挖掘的首选框架。
hbase是一个nosql数据库,是参考了google内部的bigtable模型设计出来的一个nosql数据库,他减少了数据的冗余和使查询的效率提高,是实现数据挖掘的相关数据库的nosql数据库的首选语言,且底层数据存储在hadoop中的hdfs中。

使用版本:hadoop1.0和hbase0.98

hbase使用java编写的,所以要使用java开发的话,可以直接调用hbase计算框架,但是要使用java以外的语言开发的话,必须使用相关的依赖组件。

Hbase的python操作
thrift就是使用python操作Hbase数据库的依赖组件

安装thriftserver:

要使用thrift,得把这些插件安装完成

yum -y install automake libtool bison pkgconfig gcc-c++ boost-devel libevent-devel alib-devel python-devel ruby-devel openssl-devel boost-devel.x86_64 libevent-devel.x86_64

接下来安装thrift

wget http://archive.apache.org/dist/thrift/0.8.0/thrift-0.8.0.tar.gz
//下载thrift插件
tar zxvf thrift-0.8.0.tar.gz
//解压thrift

然后进行配置和编译

#./configure --with-cpp=no --with-ruby=no
#make
#make install
//检查是否编译完成
thrift


file
Options:
  -version    Print the compiler version
  -o dir      Set the output directory for gen-* packages
               (default: current directory)
  -out dir    Set the ouput location for generated files.
               (no gen-* folder will be created)
  -I dir      Add a directory to the list of directories
                searched for include directives
  -nowarn     Suppress all compiler warnings (BAD!)
  -strict     Strict compiler warnings on
  -v[erbose]  Verbose mode
  -r[ecurse]  Also generate included files
  -debug      Parse debug trace to stdout
  --allow-neg-keys  Allow negative field keys (Used to preserve protocol
                compatibility with older .thrift files)
  --allow-64bit-consts  Do not print warnings about using 64-bit constants
  --gen STR   Generate code with a dynamically-registered generator.
                STR has the form language[:key1=val1[,key2,[key3=val3]]].
                Keys and values are options passed to the generator.
                Many options will not require values.

Available generators (and options):
  as3 (AS3):
    bindable:          Add [bindable] metadata to all the struct classes.
  c_glib (C, using GLib):
  cocoa (Cocoa):
    log_unexpected:  Log every time an unexpected field ID or type is encountered.
  cpp (C++):
    cob_style:       Generate "Continuation OBject"-style classes.
    no_client_completion:
                     Omit calls to completion__() in CobClient class.
    templates:       Generate templatized reader/writer methods.
    pure_enums:      Generate pure enums instead of wrapper classes.
    dense:           Generate type specifications for the dense protocol.
    include_prefix:  Use full include paths in generated files.
  csharp (C#):
  delphi (delphi):
    ansistr_binary:  Use AnsiString as binary properties.
  erl (Erlang):
  go (Go):
  hs (Haskell):
  html (HTML):
  java (Java):
    beans:           Members will be private, and setter methods will return void.
    private-members: Members will be private, but setter methods will return 'this' like usual.
    nocamel:         Do not use CamelCase field accessors with beans.
    hashcode:        Generate quality hashCode methods.
    android_legacy:  Do not use java.io.IOException(throwable) (available for Android 2.3 and above).
    java5:           Generate Java 1.5 compliant code (includes android_legacy flag).
  javame (Java ME):
  js (java script):
    jquery:          Generate jQuery compatible code.
    node:            Generate node.js compatible code.
  ocaml (OCaml):
  perl (Perl):
  php (PHP):
    inlined:         Generate PHP inlined files
    server:          Generate PHP server stubs
    autoload:        Generate PHP with autoload
    oop:             Generate PHP with object oriented subclasses
    rest:            Generate PHP REST processors
    namespace:       Generate PHP namespaces as defined in PHP >= 5.3
  py (Python):
    new_style:       Generate new-style classes.
    twisted:         Generate Twisted-friendly RPC services.
    utf8strings:     Encode/decode strings using utf8 in the generated code.
    slots:           Generate code using slots for instance members.
    dynamic:         Generate dynamic code, less code generated but slower.
    dynbase=CLS      Derive generated classes from class CLS instead of TBase.
    dynexc=CLS       Derive generated exceptions from CLS instead of TExceptionBase.
    dynimport='from foo.bar import CLS'
                     Add an import line to generated code to find the dynbase class.
  rb (Ruby):
  st (Smalltalk):
  xsd (XSD):

然后下载hbase源码:产生针对python的hbase的api
下载hbase源码

wget http://mirrors.hust.edu.cn/apache/hbase/0.98.24/hbase-0.98.24-src.tar.gz
//下载hbase源码并且解压,进入hbase解压后的目录
find . -name Hbase.thrift
./hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift
//进入目录
 cd ./hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift
//然后进行获取秘钥
thrift -gen py Hbase.thrift
//产生一个gen-py
Cp -raf gen  -py/hbase/ home/badou/hbase_test

然后启动thriftserver

//在hbase的bin目录底下,启动thrift服务
hbase-daemon.sh start thrift
//9090是thrift的端口
jps
        3489 JobTracker
        5416 Jps
        4539 Main
        3248 NameNode
        4103 HRegionServer
        4003 HMaster
        3399 SecondaryNameNode
        3888 HQuorumPeer
        5336 ThriftServer

然后使用mapreduce进行插入数据到hbase数据库中

map.py

#!/usr/bin/python

import os
import sys

os.system('tar xvzf hbase.tgz > /dev/null')
os.system('tar xvzf thrift.tgz > /dev/null')

reload(sys)
sys.setdefaultencoding('utf-8')

sys.path.append("./")

from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

from hbase import Hbase
from hbase.ttypes import *

transport = TSocket.TSocket('master', 9090)
transport = TTransport.TBufferedTransport(transport)

protocol = TBinaryProtocol.TBinaryProtocol(transport)

client = Hbase.Client(protocol)

transport.open()

tableName = 'new_music_table'

def mapper_func():
    for line in sys.stdin:
        ss = line.strip().split('\t')
        if len(ss) != 2:
        continue
    key = ss[0].strip()
    val = ss[1].strip()

    rowKey = key

    mutations = [Mutation(column="meta-data:name", value=val), \
            Mutation(column="flags:is_valid", value="TRUE")]

    client.mutateRow(tableName, rowKey, mutations, None)


if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]
    func(*args)

run.sh

HADOOP_CMD="/usr/local/src/hadoop-1.2.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar"

INPUT_FILE_PATH_1="/input.data"
OUTPUT_PATH="/output_hbase"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $INPUT_FILE_PATH_1 \
    -output $OUTPUT_PATH \
    -mapper "python map.py mapper_func" \
    -file ./map.py \
    -file "./hbase.tgz" \
    -file "./thrift.tgz"

这里需要注意得是hbase解压后的目录必须放在同一级别下才能使用

先在本机测试一下

from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

from hbase import Hbase
from hbase.ttypes import *

transport = TSocket.TSocket('master', 9090)
transport = TTransport.TBufferedTransport(transport)

protocol = TBinaryProtocol.TBinaryProtocol(transport)

client = Hbase.Client(protocol)

transport.open()


#==============================

base_info_contents = ColumnDescriptor(name='meta-data:', maxVersions=1)
other_info_contents = ColumnDescriptor(name='flags:', maxVersions=1)

client.createTable('new_music_table', [base_info_contents, other_info_contents])

print client.getTableNames()
//现将input.data数据传入hadoop中
hadoop fs -put input.data /

还有就是当使用mapreduce向hbase中插入数据时候,必须每个节点有hbase插件和thriftserver,这里就存在一个问题,你不能将所有的节点都安装上thrift和hbase,所以使用压缩方式向各个节点分发并且将所有使用thriftserver的地方指向本地机器,

这时候一个完整的利用mapreduce计算框架向hbase插入数据(python脚本)才算完成。
下次会使用hive语句向hbase插入数据。

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇HBase集群间不停服迁移数据 下一篇HBase 修改TTL 属性释放空间

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目