设为首页 加入收藏

TOP

scala spark hbase 操作案例
2019-02-09 01:50:15 】 浏览:17
Tags:scala spark hbase 操作 案例
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/ldds_520/article/details/51860232

</pre><pre code_snippet_id="1751778" snippet_file_name="blog_20160708_1_3020430" name="code" class="plain">s

1、软件版本Hadoop 2.6.4、 HBase 1.0.3、spark 1.6.0 、 scala-2.10.5

程序jar包版本:

hbase-server.1.0.3、spark-core_2.10.1.1.0、


2、maven配置文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>spark-hbase2</groupId>
  <artifactId>spark-hbase2</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>${project.artifactId}</name>
  <description>My wonderfull scala app</description>
  <inceptionYear>2015</inceptionYear>
  <licenses>
    <license>
      <name>My License</name>
      <url>http://....</url>
      <distribution>repo</distribution>
    </license>
  </licenses> 

	<repositories>
	
		<repository>
			<id>maven-apache-repo2</id>
			<name>Maven Plugin Repository 2</name>
			<url>http://repo2.maven.org/maven2</url>
			<releases>
				<enabled>true</enabled>
			</releases>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
		</repository>
      
		 <repository>
			<id>maven-centeral</id>
			<name>Maven Plugin Repository 2</name>
			<url>http://central.maven.org/maven2</url>
			<releases>
				<enabled>true</enabled>
			</releases> 
			<snapshots>
				<enabled>false</enabled>
			</snapshots> 
		</repository> 
		<repository>
			<id>maven-com-sun</id>
			<name>Maven2 repository of SUN</name>
			<url>http://download.java.net/maven/2</url>
			<releases>
				<enabled>true</enabled>
			</releases>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
		</repository>
		<repository>
			<id>logicaldoc.sourceforge.net</id>
			<name>logicaldoc.sourceforge.net</name>
			<url>http://logicaldoc.sourceforge.net/maven/</url>
			<releases>
				<enabled>true</enabled>
			</releases>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
		</repository>
		<repository>
			<id>server.oosnmp.net</id>
			<name>server.oosnmp.net</name>
			<url>https://server.oosnmp.net/dist/release</url>
			<releases>
				<enabled>true</enabled>
			</releases>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
		</repository>
		<repository>
			<id>codehaus.org</id>
			<name>codehaus.org</name>
			<url>http://repository.codehaus.org</url>
			<releases>
				<enabled>true</enabled>
			</releases>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
		</repository>
		<repository>
			<id>sonatype.org</id>
			<name>sonatype.org</name>
			<url>https://oss.sonatype.org/content/groups/jetty</url>
			<releases>
				<enabled>true</enabled>
			</releases>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
		</repository>
	</repositories>

  <properties>
    <maven.compiler.source>1.6</maven.compiler.source>
    <maven.compiler.target>1.6</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.11.5</scala.version>
    <scala.compat.version>2.11</scala.compat.version>
  </properties>

  <dependencies>
  
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <!-- Test -->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.specs2</groupId>
      <artifactId>specs2-core_${scala.compat.version}</artifactId>
      <version>2.4.16</version>
      <scope>test</scope>
    </dependency> 
    <dependency>
      <groupId>org.scalatest</groupId>
      <artifactId>scalatest_${scala.compat.version}</artifactId>
      <version>2.2.4</version>
      <scope>test</scope>
    </dependency>
    
<!--      <dependency>
		<groupId>org.apache.hbase</groupId>
		<artifactId>hbase-client</artifactId>  
		<version>1.0.3</version>
	</dependency>  
    
	<dependency>
		<groupId>org.apache.hbase</groupId>
		<artifactId>hbase-server</artifactId>
		<version>1.0.3</version>
	</dependency>   -->
   <dependency>
		<groupId>org.apache.hbase</groupId>
		<artifactId>hbase-server</artifactId>
		<version>1.0.3</version>
	</dependency> 
	     <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<!-- 	<dependency>
	    <groupId>org.apache.hadoop</groupId>
	    <artifactId>hadoop-client</artifactId>
	    <version>2.6.0</version>
	</dependency> 
  -->  
    <dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-core_2.10</artifactId>
			<version>1.1.0</version>
	</dependency>  
    
    <dependency>
   		 <groupId>jdk.tools</groupId>
   		 <artifactId>jdk.tools</artifactId>
   		 <version>1.6</version>
   		 <scope>system</scope>
   		 <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
	</dependency>
    
  </dependencies>

 <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
       <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <version>2.15.2</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
            <configuration>
        			<descriptorRefs>
						<descriptorRef>jar-with-dependencies</descriptorRef>
					</descriptorRefs>
              <args>
                <arg>-dependencyfile</arg>
                <arg>${project.build.directory}/.scala_dependencies</arg>
              </args>
            </configuration>
          </execution>
        </executions>
      </plugin>

      <plugin>
				<artifactId>maven-assembly-plugin</artifactId>
				<version>2.5.5</version>
				<configuration>
					<appendAssemblyId>false</appendAssemblyId>
					<descriptorRefs>
						<descriptorRef>jar-with-dependencies</descriptorRef>
					</descriptorRefs>
<!-- 					<archive>
						<manifest>
							<mainClass>spark_hbase.spark_hbase.App</mainClass>
						</manifest>
					</archive>
 -->				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<phase>package</phase>
						<goals>
							<goal>assembly</goal>
						</goals>
					</execution>
				</executions>
			</plugin> 
      
       
       <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.10</version>
        <configuration>
          <useFile>false</useFile>
          <disableXmlReport>true</disableXmlReport>
          <!-- If you have classpath issue like NoDefClassError,... -->
          <!-- useManifestOnlyJar>false</useManifestOnlyJar -->
          <includes>
            <include>**/*Test.*</include>
            <include>**/*Suite.*</include>
          </includes>
        </configuration>
      </plugin>
      
      
    </plugins>
  </build>
  </project>

3、scala 代码

package spark_hbase.spark_hbase

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.HTableDescriptor
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark._
import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.client.Get
import org.apache.spark.serializer.KryoSerializer

object SparkHBase1 extends Serializable {
 def main(args: Array[String]): Unit = {
   
    val sparkConf = new SparkConf().setMaster("local").setAppName("HBaseTest")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    val sc = new SparkContext(sparkConf)
    var table_name = "test"
    val conf = HBaseConfiguration.create()
    
    conf.set("hbase.rootdir", "hdfs://wwwwww-1/hbase")
		conf.set("hbase.zookeeper.quorum", "11.11.131.19,11.11.131.20,11.11.131.21")
	  conf.set("hbase.zookeeper.property.clientPort", "2181")
		conf.set("hbase.master", "60001")
    conf.set(TableInputFormat.INPUT_TABLE, table_name)
    
    val hadmin = new HBaseAdmin(conf)
    
    if (!hadmin.isTableAvailable("test")) {    
      print("Table Not Exists! Create Table")    
      val tableDesc = new HTableDescriptor("test")    
      tableDesc.addFamily(new HColumnDescriptor("basic".getBytes()))  
      hadmin.createTable(tableDesc)    
    }else{
      print("Table  Exists!  not Create Table")   
    }    
    
    val table = new HTable(conf, "test");    
    for (i <- 1 to 5) {    
      var put = new Put(Bytes.toBytes("row" + i))   
      put.add(Bytes.toBytes("basic"), Bytes.toBytes("name"), Bytes.toBytes("value " + i))  
      table.put(put)   
    }    
    table.flushCommits()  
    
    //Scan操作  
    val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],    
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],    
      classOf[org.apache.hadoop.hbase.client.Result])    
    
    val count = hbaseRDD.count()    
    println("HBase RDD Count:" + count)    
    hbaseRDD.cache()    
    
    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
        classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
    classOf[org.apache.hadoop.hbase.client.Result])
    
    val g = new Get("row1".getBytes)
    val result = table.get(g)
    val value = Bytes.toString(result.getValue("basic".getBytes,"name".getBytes))
    println("GET id001 :"+value)
    
    
    hbaseRDD.cache()    
    print("------------------------scan----------")
    val res = hbaseRDD.take(count.toInt)    
    for (j <- 1 until count.toInt) {    
      println("j: " + j)    
      var rs = res(j - 1)._2    
      var kvs = rs.raw    
      for (kv <- kvs)    
        println("rowkey:" + new String(kv.getRow()) +    
          " cf:" + new String(kv.getFamily()) +    
          " column:" + new String(kv.getQualifier()) +    
          " value:" + new String(kv.getValue()))    
    }    
    
/*    println("-------------------------")
    println("--take1" + hBaseRDD.take(1))
    println("--count" + hBaseRDD.count())*/
     
    
    //insert_hbase(100002,3)
  }
  //写入hbase
 /* def insert_hbase(news_id:Int,type_id:Int): Unit ={
    var table_name = "news"
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum","192.168.110.233, 192.168.110.234, 192.168.110.235");
    conf.set("hbase.zookeeper.property.clientPort", "2181");
    val table = new HTable(conf, table_name)
    val hadmin = new HBaseAdmin(conf)
    val row = Bytes.toBytes(news_id.toString())
    val p = new Put(row)
    p.add(Bytes.toBytes("content"),Bytes.toBytes("typeid"),Bytes.toBytes(type_id.toString()))
    table.put(p)
    table.close()
  } */
}


4、编译

mvn clean scala:compile compile package -X

eclipse环境搭建 详见:http://blog.csdn.net/ldds_520/article/details/51830721


5、打包运行:

spark/bin/spark-submit --master spark://11.11.131.119:7077 --name spark-hbase --class "spark_hbase.spark_hbase.SparkHBase1" /spark-hbase2-0.0.1-SNAPSHOT.jar localhost 9999


6、hadoop与HBase版本对应关系:


Hbase Hadoop
0.92.0 1.0.0
0.92.1 1.0.0
0.92.2 1.0.3
0.94.0 1.0.2
0.94.1 1.0.3
0.94.2 1.0.3
0.94.3 1.0.4
0.94.4 1.0.4
0.94.5 1.0.4
0.94.9 1.2.0
0.95.0 1.2.0

hadoop1.2+hbase0.95.0+hive0.11.0 会产生hbase+hive的不兼容,创建hive+hbase的关联表就会报pair对异常。
hadoop1.2+hbase0.94.9+hive0.10.0 没问题,解决了上个版本的不兼容问题。




下面在给列出官网信息:
下面面符号的含义:
S =支持并且测试,
X = 不支持,
NT =应该可以,但是没有测试.



HBase-0.92.x
HBase-0.94.x
HBase-0.96.x
HBase-0.98.x[a]
HBase-1.0.x
Hadoop-0.20.205
S
X
X
X
X
Hadoop-0.22.x
S
X
X
X
X
Hadoop-1.0.0-1.0.2[c]
X
X
X
X
X
Hadoop-1.0.3+
S
S
S
X
X
Hadoop-1.1.x
NT
S
S
X
X
Hadoop-0.23.x
X
S
NT
X
X
Hadoop-2.0.x-alpha
X
NT
X
X
X
Hadoop-2.1.0-beta
X
NT
S
X
X
Hadoop-2.2.0
X
NT[d]
S
S
NT
Hadoop-2.3.x
X
NT
S
S
NT
Hadoop-2.4.x
X
NT
S
S
S
Hadoop-2.5.x
X
NT
S
S
S



编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇hbase 删除部门wal日志后自我修复 下一篇HBase如何存取多个版本的值

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }