亲测有效
spark-shell --jars /home/wsy/jars/hbase-spark-1.2.0-cdh5.7.1.jar
def readHbase(sc:org.apache.spark.SparkContext,readTableName:String="USER") ={
val hbaseConf = org.apache.hadoop.ate()
hbaseConf.set(keeper.quorum","s1sl11,s1ma11,s1sl22")
hbaseConf.set(keeper.property.clientPort", "2181")
hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE,readTableName)
val hbaseRDD = sc.newAPIHadoopRDD(hbaseConf,
classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
import org.apache.hadoop.hbase.util.Bytes
val m2cRDD=hbaseRDD.map(
r=>{
val mobile_no:StringString(r._2.Bytes("cf"),Bytes("mobile_no")))
val user_id:StringString(r._2.Bytes("cf"),Bytes("user_id")))
val is_validated:StringString(r._2.Bytes("cf"),Bytes("is_validated")))
if(is_validated == "true" && mobile_no != null && user_id != null ){
(mobile_no,user_id,is_validated)
}else{
("-1","-1","-1")
}
}
)
m2cRDD
}
//此方法效率极低,1亿4千万的数据需要9.1个小时,请使用
spark-shell读写HBase,dataframe方法,连接如下
def putWriteHbase(sc:org.apache.spark.SparkContext,m2cRDD:org.apache.spark.rdd.RDD[(String, String,String)],writeTableName:String="MOBILE2CMPAYID")={
m2cRDD.foreachPartition(
iter => {
val hbaseConf = org.apache.hadoop.ate()
hbaseConf.set(keeper.quorum","s1sl11,s1ma11,s1sl22")
hbaseConf.set(keeper.property.clientPort", "2181")
val cn = org.apache.hadoop.hbase.ateConnection(hbaseConf)
val hbaseTableTable(org.apache.hadoop.hbase.TableName.valueOf(writeTableName))
iter.foreach(
row => {
import org.apache.hadoop.hbase.util.Bytes
val mobile_no:String=row._1
val user_id:String=row._2
val is_validated:String=row._3
if(mobile_no != "-1"){
val put = new org.apache.hadoop.hbase.client.Bytes(mobile_no))
put.Bytes("cf"),Bytes("mobile_no"),Bytes(mobile_no))
put.Bytes("cf"),Bytes("user_id"),Bytes(user_id))
put.Bytes("cf"),Bytes("is_validated"),Bytes(is_validated))
hbaseTable.put(put)
}
}
)
hbaseTable.close()
}
)
}
val m2cRDD=readHbase(sc,"USER")
putWriteHbase(sc,m2cRDD,"MOBILE2CMPAYID")
def HFileWriteHbase(sc:org.apache.spark.SparkContext,m2cRDD:org.apache.spark.rdd.RDD[(String, String)],writeTableName:String="tmp_mobile2cmpayidHFile")={
import org.apache.hadoop.hbase.util.Bytes
val toByteArrays= m2cRDD.map(row => {
val mobile_no:String=row._1
val user_id:String=row._2
val rowkeyBytesBytes(mobile_no)
val kvs=List(
(Bytes("mobile_no"),Bytes(mobile_no)),
(Bytes("user_id"),Bytes(user_id))
)
(rowkeyBytes,kvs)
})
val hbaseConf = org.apache.hadoop.ate()
hbaseConf.set(keeper.quorum","s1sl11,s1ma11,s1sl22")
val hbaseContext=new org.apache.hadoop.hbase.spark.HBaseContext(sc,hbaseConf)
val tableName=org.apache.hadoop.hbase.TableName.valueOf(writeTableName)
val stagingFolder="hdfs:///tmp/wsy/hfile/test"
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
toByteArrays.hbaseBulkLoad(hbaseContext,tableName,
t => {
val rowKey=t._1
val seqllection.mutable.ListBuffer[(org.apache.hadoop.hbase.spark.KeyFamilyQualifier,Array[Byte])]()
for (kv
val qualifier=kv._1
val value=kv._2
String(value) != "-1"){
val keyFamilyQualifier=new org.apache.hadoop.hbase.spark.KeyFamilyQualifier(Bytes("cf"),qualifier)
seq.append((keyFamilyQualifier,value))
}
}
seq.iterator
},
stagingFolder)
val load=new org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles(hbaseConf)
load.run(Array(stagingFolder,writeTableName))
}
putWriteHbase(sc,readHbase(sc))
HFileWriteHbase(sc,readHbase(sc))
disable 'tmp_mobile2cmpayid'
drop 'tmp_mobile2cmpayid'
create 'tmp_mobile2cmpayid','cf'
需要注意的几点:
1,hbaseConf.set(keeper.quorum","s1sl11,s1ma11,s1sl22")
这一句不加,运行报错见下图
2,spark-shell --jars /home/wsy/jars/hbase-spark-1.2.0-cdh5.7.1.jar
这个jar包不加,org.apache.hadoop.hbase.spark此package下的类找不到
缺少zookepper
本文发布于:2024-01-28 04:40:49,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/17063880554849.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |