spark 存入hbase

阅读: 评论:0

spark 存入hbase

spark 存入hbase

亲测有效

spark-shell --jars /home/wsy/jars/hbase-spark-1.2.0-cdh5.7.1.jar

def readHbase(sc:org.apache.spark.SparkContext,readTableName:String="USER") ={

val hbaseConf = org.apache.hadoop.ate()

hbaseConf.set(&#keeper.quorum","s1sl11,s1ma11,s1sl22")

hbaseConf.set(&#keeper.property.clientPort", "2181")

hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE,readTableName)

val hbaseRDD = sc.newAPIHadoopRDD(hbaseConf,

classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],

classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],

classOf[org.apache.hadoop.hbase.client.Result])

import org.apache.hadoop.hbase.util.Bytes

val m2cRDD=hbaseRDD.map(

r=>{

val mobile_no:String&#String(r._2.Bytes("cf"),Bytes("mobile_no")))

val user_id:String&#String(r._2.Bytes("cf"),Bytes("user_id")))

val is_validated:String&#String(r._2.Bytes("cf"),Bytes("is_validated")))

if(is_validated == "true" && mobile_no != null && user_id != null ){

(mobile_no,user_id,is_validated)

}else{

("-1","-1","-1")

}

}

)

m2cRDD

}

//此方法效率极低,1亿4千万的数据需要9.1个小时,请使用

spark-shell读写HBase,dataframe方法,连接如下

def putWriteHbase(sc:org.apache.spark.SparkContext,m2cRDD:org.apache.spark.rdd.RDD[(String, String,String)],writeTableName:String="MOBILE2CMPAYID")={

m2cRDD.foreachPartition(

iter => {

val hbaseConf = org.apache.hadoop.ate()

hbaseConf.set(&#keeper.quorum","s1sl11,s1ma11,s1sl22")

hbaseConf.set(&#keeper.property.clientPort", "2181")

val cn = org.apache.hadoop.hbase.ateConnection(hbaseConf)

val hbaseTable&#Table(org.apache.hadoop.hbase.TableName.valueOf(writeTableName))

iter.foreach(

row => {

import org.apache.hadoop.hbase.util.Bytes

val mobile_no:String=row._1

val user_id:String=row._2

val is_validated:String=row._3

if(mobile_no != "-1"){

val put = new org.apache.hadoop.hbase.client.Bytes(mobile_no))

put.Bytes("cf"),Bytes("mobile_no"),Bytes(mobile_no))

put.Bytes("cf"),Bytes("user_id"),Bytes(user_id))

put.Bytes("cf"),Bytes("is_validated"),Bytes(is_validated))

hbaseTable.put(put)

}

}

)

hbaseTable.close()

}

)

}

val m2cRDD=readHbase(sc,"USER")

putWriteHbase(sc,m2cRDD,"MOBILE2CMPAYID")

def HFileWriteHbase(sc:org.apache.spark.SparkContext,m2cRDD:org.apache.spark.rdd.RDD[(String, String)],writeTableName:String="tmp_mobile2cmpayidHFile")={

import org.apache.hadoop.hbase.util.Bytes

val toByteArrays= m2cRDD.map(row => {

val mobile_no:String=row._1

val user_id:String=row._2

val rowkeyBytes&#Bytes(mobile_no)

val kvs=List(

(Bytes("mobile_no"),Bytes(mobile_no)),

(Bytes("user_id"),Bytes(user_id))

)

(rowkeyBytes,kvs)

})

val hbaseConf = org.apache.hadoop.ate()

hbaseConf.set(&#keeper.quorum","s1sl11,s1ma11,s1sl22")

val hbaseContext=new org.apache.hadoop.hbase.spark.HBaseContext(sc,hbaseConf)

val tableName=org.apache.hadoop.hbase.TableName.valueOf(writeTableName)

val stagingFolder="hdfs:///tmp/wsy/hfile/test"

import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._

toByteArrays.hbaseBulkLoad(hbaseContext,tableName,

t => {

val rowKey=t._1

val seq&#llection.mutable.ListBuffer[(org.apache.hadoop.hbase.spark.KeyFamilyQualifier,Array[Byte])]()

for (kv

val qualifier=kv._1

val value=kv._2

String(value) != "-1"){

val keyFamilyQualifier=new org.apache.hadoop.hbase.spark.KeyFamilyQualifier(Bytes("cf"),qualifier)

seq.append((keyFamilyQualifier,value))

}

}

seq.iterator

},

stagingFolder)

val load=new org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles(hbaseConf)

load.run(Array(stagingFolder,writeTableName))

}

putWriteHbase(sc,readHbase(sc))

HFileWriteHbase(sc,readHbase(sc))

disable 'tmp_mobile2cmpayid'

drop 'tmp_mobile2cmpayid'

create 'tmp_mobile2cmpayid','cf'

需要注意的几点:

1,hbaseConf.set(&#keeper.quorum","s1sl11,s1ma11,s1sl22")

这一句不加,运行报错见下图

2,spark-shell --jars /home/wsy/jars/hbase-spark-1.2.0-cdh5.7.1.jar

这个jar包不加,org.apache.hadoop.hbase.spark此package下的类找不到

缺少zookepper

本文发布于:2024-01-28 04:40:49,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/17063880554849.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:spark   hbase
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23