Fire框架

阅读: 评论:0

Fire框架

Fire框架

注:本文档描述如何使用fire框架提供的Hbase api进行Hbase读写操作,fire框架内置众多connector,可以很方便的进行spark或flink开发。

HBase 读写

​ HBase对更新和点查具有很好的支持,在实时计算场景下也是应用十分广泛的。为了进一步简化HBase读写api,提高开发效率,fire框架对HBase API进行了深度封装。目前支持3种读写模式,分别是:Java API、Bulk API以及Spark提供的API。另外,fire框架支持在同一个任务中对任意多个hbase集群同时进行读写。

一、HBase集群配置

1.1 定义别名

建议将hbase集群url信息定义成别名,别名定义放到名为common.properties的配置文件中。别名的好处是一处维护到处生效,方便共用,便于记忆。

# 定义hbase集群连接信息别名为test,代码中hbase配置简化为:@HBase("test")
fire.hbase.st     =           zk01:2181,zk02:2181,zk03:2181

1.2 基于注解配置

@HBase("zk01:2181,zk02:2181,zk03:2181")
@HBase2(cluster = "test", scanPartitions = 3, storageLevel = "DISK_ONLY")

1.3 基于配置文件

# 方式一:直接指定zkurl
hbase.cluster                                   =               zkurl
# 方式二:事先定义好hbase别名与url的映射,然后通过别名配置,以下配置定义了别名test与url的映射关系
fire.hbase.st     =               zk01:2181,zk02:2181,zk03:2181
# 通过别名方式引用
hbase.cluster2                              =               test

二、表与JavaBean映射

fire框架通过Javabean与HBase表建立的关系简化读写api:

/*** 对应HBase表的JavaBean** @author ChengLong 2019-6-20 16:06:16*/
@HConfig(multiVersion = true)
public class Student extends HBaseBaseBean<Student> {private Long id;private String name;private Integer age;// 多列族情况下需使用family单独指定private String createTime;// 若JavaBean的字段名称与HBase中的字段名称不一致,需使用value单独指定// 此时hbase中的列名为length1,而不是length@FieldName(family = "data", value = "length1")private BigDecimal length;private Boolean sex;/*** rowkey的构建** @return*/@Overridepublic Student buildRowKey() {wKey = String();return this;}
}

​ 上述代码中定义了名为Student的Javabean,该Javabean需要继承自HBaseBaseBean,并实现buildRowKey方法,这个方法中需要告诉fire框架,rowKey是如何构建的。

​ 通过以上两步即可实现Javabean与HBase表的关系绑定。对于个性化需求,如果需要以多版本的方式进行读写,则需在类名上添加@HConfig(multiVersion = true)注解。如果Javabean中的列名与HBase中的字段名不一致,可以通过@FieldName(family = "data", value = "length1")进行单独指定,当然,列族也可以通过这个注解指定。如果不知道列族名称,则默认只有一个名为info的列族。

目前暂不支持scala语言的class以及case class,仅支持基本的字段数据类型,不支持嵌套的或者复杂的字段类型。

三、spark任务

1.1 java api

/*** 使用HBaseConnector插入一个rdd的数据* rdd的类型必须为HBaseBaseBean的子类*/
def testHbasePutRDD: Unit = {val studentList = wStudentList()val studentRDD = ateRDD(studentList, 2)// 为空的字段不插入studentRDD.hbasePutRDD(this.tableName1)
}/*** 使用HBaseConnector插入一个DataFrame的数据*/
def testHBasePutDF: Unit = {val studentList = wStudentList()val studentDF = ateDataFrame(studentList, classOf[Student])// 每个批次插100条studentDF.hbasePutDF(this.tableName1, classOf[Student])
}/*** 使用HBaseConnector get数据,并将结果以RDD方式返回*/
def testHbaseGetRDD: Unit = {val getList = Seq("1", "2", "3", "5", "6")val getRDD = ateRDD(getList, 2)// 以多版本方式get,并将结果集封装到rdd中返回val studentRDD = this.fire.hbaseGetRDD(this.tableName1, classOf[Student], getRDD)studentRDD.printEachPartition
}/*** 使用HBaseConnector get数据,并将结果以DataFrame方式返回*/
def testHbaseGetDF: Unit = {val getList = Seq("1", "2", "3", "4", "5", "6")val getRDD = ateRDD(getList, 3)// get到的结果以dataframe形式返回val studentDF = this.fire.hbaseGetDF(this.tableName1, classOf[Student], getRDD)studentDF.show(100, false)
}/*** 使用HBaseConnector scan数据,并以RDD方式返回*/
def testHbaseScanRDD: Unit = {val rdd = this.fire.hbaseScanRDD2(this.tableName1, classOf[Student], "1", "6")partition(3).printEachPartition
}/*** 使用HBaseConnector scan数据,并以DataFrame方式返回*/
def testHbaseScanDF: Unit = {val dataFrame = this.fire.hbaseScanDF2(this.tableName1, classOf[Student], "1", "6")partition(3).show(100, false)
}

1.2 bulk api

/*** 使用bulk的方式将rdd写入到hbase*/
def testHbaseBulkPutRDD: Unit = {// 方式一:将rdd的数据写入到hbase中,rdd类型必须为HBaseBaseBean的子类val rdd = wStudentList(), 2)// rdd.hbaseBulkPutRDD(this.tableName2)// 方式二:使用this.fire.hbaseBulkPut将rdd中的数据写入到hbasethis.fire.hbaseBulkPutRDD(this.tableName2, rdd)// 第二个参数指定false表示不插入为null的字段到hbase中// rdd.hbaseBulkPutRDD(this.tableName2, insertEmpty = false)// 第三个参数为true表示以多版本json格式写入// rdd.hbaseBulkPutRDD(this.tableName3, false, true)
}/*** 使用bulk的方式将DataFrame写入到hbase*/
def testHbaseBulkPutDF: Unit = {// 方式一:将DataFrame的数据写入到hbase中val rdd = wStudentList(), 2)val studentDF = ateDataFrame(rdd, classOf[Student])// insertEmpty=false表示为空的字段不插入studentDF.hbaseBulkPutDF(this.tableName1, classOf[Student], keyNum = 2)// 方式二:// this.fire.hbaseBulkPutDF(this.tableName2, studentDF, classOf[Student])
}/*** 使用bulk方式根据rowKey获取数据,并将结果集以RDD形式返回*/
def testHBaseBulkGetRDD: Unit = {// 方式一:使用rowKey读取hbase中的数据,rowKeyRdd类型为Stringval rowKeyRdd = ateRDD(String, 2.toString, 3.toString, 5.toString, 6.toString), 2)val studentRDD = rowKeyRdd.hbaseBulkGetRDD(this.tableName1, classOf[Student], keyNum = 2)studentRDD.foreach(println)// 方式二:使用this.fire.hbaseBulkGetRDD// val studentRDD2 = this.fire.hbaseBulkGetRDD(this.tableName2, rowKeyRdd, classOf[Student])// studentRDD2.foreach(println)
}/*** 使用bulk方式根据rowKey获取数据,并将结果集以DataFrame形式返回*/
def testHBaseBulkGetDF: Unit = {// 方式一:使用rowKey读取hbase中的数据,rowKeyRdd类型为Stringval rowKeyRdd = ateRDD(String, 2.toString, 3.toString, 5.toString, 6.toString), 2)val studentDF = rowKeyRdd.hbaseBulkGetDF(this.tableName2, classOf[Student])studentDF.show(100, false)// 方式二:使用this.fire.hbaseBulkGetDFval studentDF2 = this.fire.hbaseBulkGetDF(this.tableName2, rowKeyRdd, classOf[Student])studentDF2.show(100, false)
}/*** 使用bulk方式进行scan,并将结果集映射为RDD*/
def testHbaseBulkScanRDD: Unit = {// scan操作,指定rowKey的起止或直接传入自己构建的scan对象实例,返回类型为RDD[Student]val scanRDD = this.fire.hbaseBulkScanRDD2(this.tableName2, classOf[Student], "1", "6")scanRDD.foreach(println)
}/*** 使用bulk方式进行scan,并将结果集映射为DataFrame*/
def testHbaseBulkScanDF: Unit = {// scan操作,指定rowKey的起止或直接传入自己构建的scan对象实例,返回类型为DataFrameval scanDF = this.fire.hbaseBulkScanDF2(this.tableName2, classOf[Student], "1", "6")scanDF.show(100, false)
}

1.3 spark api

/*** 基于saveAsNewAPIHadoopDataset封装,将rdd数据保存到hbase中*/
def testHbaseHadoopPutRDD: Unit = {val studentRDD = wStudentList(), 2)this.fire.hbaseHadoopPutRDD(this.tableName2, studentRDD, keyNum = 2)// 方式二:直接基于rdd进行方法调用// studentRDD.hbaseHadoopPutRDD(this.tableName1)
}/*** 基于saveAsNewAPIHadoopDataset封装,将DataFrame数据保存到hbase中*/
def testHbaseHadoopPutDF: Unit = {val studentRDD = wStudentList(), 2)val studentDF = ateDataFrame(studentRDD, classOf[Student])// 由于DataFrame相较于Dataset和RDD是弱类型的数据集合,所以需要传递具体的类型classOf[Type]this.fire.hbaseHadoopPutDF(this.tableName3, studentDF, classOf[Student])// 方式二:基于DataFrame进行方法调用// studentDF.hbaseHadoopPutDF(this.tableName3, classOf[Student])
}/*** 使用Spark的方式scan海量数据,并将结果集映射为RDD*/
def testHBaseHadoopScanRDD: Unit = {val studentRDD = this.fire.hbaseHadoopScanRDD2(this.tableName2, classOf[Student], "1", "6", keyNum = 2)studentRDD.printEachPartition
}/*** 使用Spark的方式scan海量数据,并将结果集映射为DataFrame*/
def testHBaseHadoopScanDF: Unit = {val studentDF = this.fire.hbaseHadoopScanDF2(this.tableName3, classOf[Student], "1", "6")studentDF.show(100, false)
}

四、flink任务

样例代码:

/*** table的hbase sink*/
def testTableHBaseSink(stream: DataStream[Student]): Unit = {ateOrReplaceTempView("student")val table = this.flink.sqlQuery("select id, name, age from student group by id, name, age")// 方式一、自动将row转为对应的JavaBean// 注意:table对象上调用hbase api,需要指定泛型table.hbasePutTable[Student](this.tableName).setParallelism(1)this.fire.hbasePutTable[Student](table, this.tableName2, keyNum = 2)// 方式二、用户自定义取数规则,从row中创建HBaseBaseBean的子类table.hbasePutTable2[Student](this.tableName3)(row => new Student(1L, Field(1).toString, Field(2).Int))// 或者this.fire.hbasePutTable2[Student](table, this.tableName5, keyNum = 2)(row => new Student(1L, Field(1).toString, Field(2).Int))
}/*** table的hbase sink*/
def testTableHBaseSink2(stream: DataStream[Student]): Unit = {val table = this.fire.sqlQuery("select id, name, age from student group by id, name, age")// 方式二、用户自定义取数规则,从row中创建HBaseBaseBean的子类table.hbasePutTable2(this.tableName6)(row => new Student(1L, Field(1).toString, Field(2).Int))// 或者this.flink.hbasePutTable2(table, this.tableName7, keyNum = 2)(row => new Student(1L, Field(1).toString,         Field(2).Int))
}/*** stream hbase sink*/
def testStreamHBaseSink(stream: DataStream[Student]): Unit = {// 方式一、DataStream中的数据类型为HBaseBaseBean的子类// stream.hbasePutDS(this.tableName)this.fire.hbasePutDS[Student](stream, this.tableName8)// 方式二、将value组装为HBaseBaseBean的子类,逻辑用户自定义stream.hbasePutDS2(this.tableName9, keyNum = 2)(value => value)// 或者this.fire.hbasePutDS2(stream, this.tableName10)(value => value)
}/*** stream hbase sink*/
def testStreamHBaseSink2(stream: DataStream[Student]): Unit = {// 方式二、将value组装为HBaseBaseBean的子类,逻辑用户自定义stream.hbasePutDS2(this.tableName11)(value => value)// 或者this.fire.hbasePutDS2(stream, this.tableName12, keyNum = 2)(value => value)
}/*** hbase的基本操作*/
def testHBase: Unit = {// get操作val getList = ListBuffer(HBaseConnector.buildGet("1"))val student = (this.tableName, classOf[Student], getList, 1)if (student != null) JSONString(student))// scan操作val studentList = HBaseConnector.scan(this.tableName, classOf[Student], HBaseConnector.buildScan("0", "9"), 1)if (studentList != null) JSONString(studentList))// delete操作HBaseConnector.deleteRows(this.tableName, Seq("1"))
}

五、多集群读写

fire框架支持同一个任务中对任意多个hbase集群进行读写,首先要在配置文件中以keyNum进行指定要连接的所有hbase集群的zk地址:

@HBase("zk01:2181")
@HBase2("zk02:2181")
@HBase3("zk03:2181")
hbase.cluster=zk01:2181
hbase.cluster3=zk02:2181
hbase.cluster8=zk03:2181

在代码中,通过keyNum参数告诉fire这行代码连接的hbase集群是哪个。注意:api中的keyNum要与配置中的数字对应上。

// insert 操作
studentRDD.hbasePutRDD(this.tableName1)
studentRDD.hbasePutRDD(this.tableName2, keyNum = 3)
studentRDD.hbasePutRDD(this.tableName3, keyNum = 8)
// scan 操作
this.fire.hbaseScanDF2(this.tableName1, classOf[Student], "1", "6")
this.fire.hbaseScanDF2(this.tableName1, classOf[Student], "1", "6", keyNum = 3)

六、@HBase

/*** HBase集群连接信息:hbase.cluster*/
String value() default "";/*** HBase集群连接信息:hbase.cluster,同value*/
String cluster() default "";/*** 列族名称&#lumn.family*/
String family() default "";/*** 每个线程最多insert的记录数:fire.hbase.batch.size*/
int batchSize() default -1;/*** spark引擎:scan hbase后存放到rdd的多少个partition中:fire.hbase.scan.partitions*/
int scanPartitions() default -1;/*** spark引擎:scan后的缓存级别:fire.hbase.storage.level*/
String storageLevel() default "";/*** flink引擎:sink hbase失败最大重试次数:*/
int maxRetries() default -1;/*** WAL等级:hbase.durability*/
String durability() default "";/*** 是否启用表信息缓存,提高表是否存在判断的效率:fire.able*/
boolean tableMetaCache() default true;/*** hbase-client参数,以key=value形式注明*/
String[] config() default "";

七、hbase-client参数

hbase-client参数,可以通过@HBase的config或以f.为前缀的参数去指定:

@HBase(cluster = "test", config = Array[String]("hbase.rpc.timeout=60000ms", "hbase.client.scanner.timeout.period=60000ms"))
f.hbase.rpc.timeout                                       =               60000ms
f.hbase.client.scanner.timeout.period =               60000ms
参数名称引擎含义
fire.hbase.batch.size通用insert的批次大小,用于限制单个task一次最多sink的记录数
通用用于配置列族名称,默认info
flink当插入失败后,重试多少次
hbase.cluster通用所需读写的Hbase集群url或别名
hbase.durability通用Hbase-client中的durability
fire.hbase.storage.levelspark诊断scan后数据的缓存,避免重复scan hbase
fire.hbase.scan.partitionsSpark通过HBase scan后repartition的分区数
fire.hbase.cluster.map.通用hbase集群映射配置前缀
fire.able通用是否开启HBase表存在判断的缓存
fire.able通用是否开启HBase表存在列表缓存的定时更新任务
fire.ists.cache.initialDelay通用定时刷新缓存HBase表任务的初始延迟
fire.ists.cache.period通用定时刷新缓存HBase表任务的执行频率
f.通用hbase java api 配置前缀,支持任意hbase-client的参数

点star是对我们最好的鼓励:

本文发布于:2024-01-28 07:11:15,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/17063970815697.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:框架   Fire
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23