HBase是一个高可靠、高性能、面向列、可伸缩的分布式数据库,主要用来存储非结构化和半结构化的松散数据。Spark支持对HBase数据库中的数据进行读写。
一、创建一个HBase表
启动Hadoop的HDFS:
$ cd /usr/local/hadoop
$ ./sbin/start-dfs.sh
启动HBase:
$ cd /usr/local/hbase
$ ./bin/start-hbase.sh //启动HBase
$ ./bin/hbase shell //启动HBase Shell
创建一个student表,把数据保存到HBase中时,可以把id作为行健(Row Key),把info作为列族,把name、gender和age作为列。
hbase> create 'student','info'
//首先录入student表的第一个学生记录
hbase> put 'student','1','info:name','Xueqian'
hbase> put 'student','1','info:gender','F'
hbase> put 'student','1','info:age','23'
//然后录入student表的第二个学生记录
hbase> put 'student','2','info:name','Weiliang'
hbase> put 'student','2','info:gender','M'
hbase> put 'student','2','info:age','24'
二、配置Spark
把HBase安装目录下的lib目录中的一些jar文件拷贝到Spark安装目录中,这些都是编程时需要引入的jar包。需要拷贝的jar文件包括:所有hbase开头的jar文件、guava-12.0.1.jar、htrace-core3.1.0-incubating.jar和protobuf-java-2.5.0.jar。
$ cd /usr/local/spark/jars
$ mkdir hbase
$ cd hbase
$ cp /usr/local/hbase/lib/hbase*.jar ./
$ cp /usr/local/hbase/lib/guava-12.0.1.jar ./
$ cp /usr/local/hbase/lib/htrace-core-3.1.0-incubating.jar ./
$ cp /usr/local/hbase/lib/protobuf-java-2.5.0.jar ./
三、编写程序读取HBase数据
如果要让Spark读取HBase,就需要使用SparkContext提供的newAPIHadoopRDD这个API将表的内容以RDD的形式加载到Spark中。
新建一个SparkOperateHBase.scala代码文件:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SparkOperateHBase {
def main(args: Array[String]) {
val conf = HBaseConfiguration.create()
val sc = new SparkContext(new SparkConf()) //设置查询的表名
conf.set(TableInputFormat.INPUT_TABLE, "student")
val stuRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val count = stuRDD.count()
println("Students RDD Count:" + count)
stuRDD.cache()
//遍历输出
stuRDD.foreach({ case (_,result) =>
val key = Bytes.toString(result.getRow)
val name = Bytes.toString(result.getValue("info".getBytes,"name".getBytes))
val gender = Bytes.toString(result.getValue("info".getBytes,"gender".getBytes))
val age = Bytes.toString(result.getValue("info".getBytes,"age".getBytes))
println("Row key:"+key+" Name:"+name+" Gender:"+gender+" Age:"+age)
})
}
}
val stuRDD = sc.newAPIHadoopRDD()语句执行后,Spark会从HBase数据库读取数据,保存到名称为stuRDD的RDD中,stuRDD的每个RDD元素都是(ImmutableBytesWritable,Result)类型的键值对。我们所需要的student表的数据都被封装到了Result中,因此,stuRDD.foreach()在遍历每个RDD元素时,通过case (_,result),就忽略了key,只获取value(即result)。
利用sbt工具对SparkOperateHBase.scala代码文件进行编译打包,在执行打包命令之前,需要创建一个simple.sbt文件,并录入下面的内容:
$ /usr/local/spark/bin/spark-submit \
> --driver-class-path /usr/local/spark/jars/hbase/*:/usr/local/hbase/conf \
> --class "SparkOperateHBase" \
> /usr/local/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar
在spark-submit命令中,必须使用“–driver-class-path”参数指定依赖jar包的路径,而且必须把“/usr/local/hbase/conf”也加到路径中。
结果:
Students RDD Count:2
Row key:1 Name:Xueqian Gender:F Age:23
Row key:2 Name:Weiliang Gender:M Age:24
四、编写程序向HBase写入数据
编写应用程序把两个学生信息插入到HBase的student表中。
新建一个SparkWriteHBase.scala代码文件:
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.spark._ import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
object SparkWriteHBase {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("SparkWriteHBase").setMaster("local")
val sc = new SparkContext(sparkConf)
val tablename = "student"
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
val job = new Job(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
//下面这行代码用于构建两行记录
val indataRDD = sc.makeRDD(Array("3,Rongcheng,M,26","4,Guanhua,M,27"))
val rdd = indataRDD.map(_.split(",")).map{arr=>{
//设置行键(row key)的值
val put = new Put(Bytes.toBytes(arr(0)))
//设置info:name列的值
put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
//设置info:gender列的值
put.add(Bytes.toBytes("info"),Bytes.toBytes("gender"),Bytes.toBytes(arr(2)))
//设置info:age列的值
put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(3).toInt))
//构建一个键值对,作为rdd的一个元素
(new ImmutableBytesWritable, put)
}}
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
}
}
在把RDD数据写入HBase表中时,关键环节是完成RDD到表模式(Schema)的转换。在HBase中的表模式的一般形式如下:
row_key cf:col_1 cf:col_2其中,row_key表示行键,cf表示列族,col_1和col_2表示列。
在Spark中,我们操作的是RDD,val indataRDD = sc.makeRDD(Array(“3,Rongcheng,M,26”,“4,Guanhua,M,27”))语句执行后,indataRDD中的每个RDD元素都是一个字符串,即"3,Rongcheng,M,26"和"4,Guanhua,M,27"。val rdd = indataRDD.map(.split(‘,’))语句执行后,rdd中的每个RDD元素是数组,即Array(“3”,“Rongcheng”,“M”,“26”)和Array(“4”,“Guanhua”,“M”,“27”)。我们需要将RDD[Array(String,String,String,String)] 转换成 RDD[(ImmutableBytesWritable,Put)]。所以,val rdd = indataRDD.map(.split(“,”)).map{}的大括号中的语句,就定义了一个匿名函数做这个转换工作。
使用sbt工具对SparkWriteHBase.scala代码文件进行编译打包,然后,使用spark-submit命令提交运行:
$ /usr/local/spark/bin/spark-submit \
> --driver-class-path /usr/local/spark/jars/hbase/*:/usr/local/hbase/conf \
> --class "SparkWriteHBase" \
> /usr/local/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar
执行成功以后,切换到HBase Shell中,执行如下命令查看student表:
结果:
ROW COLUMN+CELL
1 column=info:age, timestamp=1479640712163, value=23
1 column=info:gender, timestamp=1479640704522, value=F
1 column=info:name, timestamp=1479640696132, value=Xueqian
2 column=info:age, timestamp=1479640752474, value=24
2 column=info:gender, timestamp=1479640745276, value=M
2 column=info:name, timestamp=1479640732763, value=Weiliang
3 column=info:age, timestamp=1479643273142, value=\x00\x00\x00\x1A
3 column=info:gender, timestamp=1479643273142, value=M
3 column=info:name, timestamp=1479643273142, value=Rongcheng
4 column=info:age, timestamp=1479643273142, value=\x00\x00\x00\x1B
4 column=info:gender, timestamp=1479643273142, value=M
4 column=info:name, timestamp=1479643273142, value=Guanhua
4 row(s) in 0.3240 seconds
两条新的记录已经被成功插入到HBase的student表中。
文章来源:《Spark编程基础》 作者:林子雨
文章内容仅供学习交流,如有侵犯,联系删除哦!