视图主对象树 -> 转换 ->右键新建 -> 直接快捷键Ctrl + S另存为test.ktr(自定义后缀,这里建议使用.ktr)选中DB连接,操作验证相关数据库是否能正确连接,这里以MySQL数据库为例。
同样创建demo.ktr文件;然后双击“表输入”进行编辑,然后在表输入中设置数据库连接名;然后双击"表输出“进行编辑,并选择批量插入。
创建kjb,并关联demo.ktl表
在不做任何优化的情况下,单表数据迁移10w条大概是100条/秒。
参数 | 参数说明 |
---|---|
defaultFetchSize=10000 | 每次与数据库交互,读多少条数据加入内存中缓存,不设置默认把所有数据读取出来,容易内存溢出(OOM),我这里设置10000,大表CPU性能高建议设置最大50000,不能超过65535 |
cachePrepStmts=true | 是否客户端缓存预处理语句 |
rewriteBatchedStatements=true | 是否开启批量写入,true表示开启,原多条insert变成单条insert执行 |
入层ODS表同一个业务日期数据中包含前一天或者后一天凌晨附近的数据或者丢失当天的变更数据。
数据漂移出现原因
(1)同一条记录的数据抽取时间extract_time明显是晚于另外三个时间的,如果用这个字段切分,ODS某个分区中的数据会包含前一天末尾的数据,并丢失当天末尾的数据。
(2)如果用数据库记录的更新时间modified_time,前台业务系统手工订正数据时可能会遗忘同步更新该时间,导致该抽取的数据被遗漏掉。
(3)另外,由于网络或者系统压力问题,log_time或者modified_time可能会晚于proc_time,导致数据漂移。
(4)如果我们直接使用proc_time时间进行切分,这种情况仅仅对包含一个业务过程的ODS表有效果,如果该表每条记录需要存储多个业务过程,则用proc_time切分会丢失其他发生在当天的业务过程记录。
①获取当天漂移的数据:根据modified_time获取后一天15分钟的数据,并限制多个和业务过程的时间戳为当天,然后根据这些数据按照modified_time升序排序,获取每个数据(主键唯一)首次数据变更的那条记录。
②获取当天未漂移的数据并剔除前一天漂移过来的数据:根据log_time分别冗余前一天最后15分钟的数据和后一天凌晨开始15分钟的数据,并用modified_time过滤非当天数据,并针对每个订单按照log_time进行降序排序,取每个订单当天最后一次数据变更的那条记录。
③将两部分数据根据订单做full join 全外连接,即可得到当天所有数据
车查勘表,车险理赔车辆损失表,车险理赔整案表,理赔报案数据表,保险车险主题表,理赔车辆信息表
在线事务处理系统(OLTP)中的数据通过一定的方式同步到操作数据存储(ODS)中,以便于进行数据分析和报表生成。OLTP数据库是面向业务的,支持高并发、低延迟的事务操作,例如订单、支付、库存等。ODS是面向数据的,存储了业务系统的原始数据或者轻度清洗后的数据,通常是分布式的大数据平台,例如Hive、HBase等。
问题:表的业务数据库越来越大,按周期全量同步的方式会影响处理效率,甚至不可能做到。
解决方案:每次只同步新变更的增量数据,然后与上一个同步周期获得的全量数据进行合
并,从而获得最新版本的全量数据。简而言之,就是增量同步,然后与原数据合并。
当前流行的大数据平台基本都不支持 update 操作 ,现在比较推荐的方式是全外连接( full outer join) +数据全量覆盖重新加载( insert overwrite ),例如日调度,则将当天的增量数据和前一天的全量数据做全外连接,重新加载最新的全量数据。
left outer join会返回右表中与左表匹配的记录或用NULL填充,而left semi join不会返回右表中的任何列,left inner join会返回右表中与左表匹配的记录。
ETL是指提取,转换,加载的过程
- 从源系统提取相关的数据
- 转换数据,对数据进行数据清洗和转换,使其适合分析
- 将数据加载到目标数据库中
1确定数据源,需要确定从哪些源系统进行数据抽取.
2定义数据接口,对每个源文件及系统的每个字段进行详细说明 .
3确定数据抽取的方法: 是主动抽取还是由源系统推送? 是增量抽取还是全量抽取? 是按照每日抽取还是月按照每月抽取?
1基本数据转换:基本转换通过消除错误、清空数据字段或简化数据来提高数据质量。
2数据清理:数据清理可消除错误并将源数据映射到目标数据格式。例如,您可以将空数据字段映射到数字 0,将数据值“Parent”映射到“p”,或将“Child”映射到“C”
3数据去重复:数据清理中的去重复将识别并删除重复记录
4数据格式修订:格式修订会将字符集、测量单位和日期/时间值等数据转换为一致的格式。例如,一家食品公司可能有不同的配方数据库,其成分以千克和磅为单位。ETL 会将所有单位转换为磅。
5高级数据转换:高级转换使用业务规则来优化数据以便于分析。
6推导:推导将业务规则应用于您的数据,以根据现有值计算新值。例如,您可以通过减去费用或通过将每件商品的价格乘以订购的商品数量来计算购买的总成本来将收入转换为利润
7联结:在数据准备中,联结将链接来自不同数据来源的相同数据。例如,您可以通过将不同供应商的采购价值相加,并仅将最终总计存储在目标系统中来找出某件物品的总采购成本
8拆分:您可以在目标系统中将列或数据属性划分为多个列。例如,如果数据来源将客户名称保存为“Jane JohnDoe”,您可以将其拆分为名字、中间名和姓氏
9汇总:汇总通过将大量数据值减少到较小的数据集中来提高数据质量。例如,客户订单发票值可以有许多不同的小金额。您可以通过将给定时间段内的数据相加来汇总数据,以构建客户生命周期价值 (CLV) 指标
10 加密:您可以通过在数据流到目标数据库之前添加加密来保护敏感数据,以遵守数据法律或数据隐私。
1完全加载:来自源的全部数据被转换并移动到数据仓库。完全加载通常发生在您第一次将数据从源系统加载到数据仓库中时。
2增量加载:ETL 工具会定期加载目标系统和源系统之间的增量(或差异)。它会存储最后提取日期以便仅加载在此日期之后添加的记录。有两种方法可以实现增量加载
3流式增量加载:如果您的数据量较小,您可以通过数据管道将持续更改流式传输到目标数据仓库。当数据速度增加到每秒数百万个事件时,您可以使用事件流处理来监控和处理数据流,从而更及时地做出决策。
4批量增量加载:如果您的数据量很大,您可以定期分批收集将负载数据更改。在此设定的时间段内,由于数据同步,源系统或目标系统都不会发生任何操作。
日志类数据都是实时产生的,所以需要尽快将日志以数据流的方式不间断地同步到数据仓库。或者对业务系统产生的数据进行实时处理,比如天猫“双11”的数据大屏,对所产生的交易数据需要实时汇总,实现秒级的数据刷新。
通过解析MySQL的binlog日志来实时获得增量的数据更新,并通过消息订阅模式来实现数据的实时同步。具体来说,就是建立一个体制数据交换中心,通过专门的模块从每台服务器源源不断地读取日志数据,或者解析业务数据库系统的binlog或归档日志,将增量数据以数据流的方式不断同步到日志交换中心,然后通知所有订阅了这些数据的数据仓库系统来获取。
阿里是通过TimeTunnel (TT)消息中间件(具有高性能、实时性、顺序性、高可靠性、高可用性、可扩展性等特点)来实现实时数据同步的。TT是一种基于生产者、消费者和Topic消息标识的消息中间件,将消息数据持久化到 HBase 的高可用、分布式数据交互系统
消息队列可以缓冲数据,避免数据的丢失或者堆积,提高数据的可靠性和安全性。
消息队列可以解耦数据源和数据目标,降低系统的耦合度,提高系统的灵活性和可维护性。
消息队列可以支持多种数据格式和协议,实现异构数据源和数据目标的同步,提高系统的兼容性和扩展性。
消息队列可以支持多对多的数据同步,实现数据的广播和分发,提高系统的效率和性能。
PySpark是Spark的Python API,它允许我们使用Python语言与Spark进行交互和开发。PySpark线程和JVM线程之间的通信和交互的过程如下:
Spark 提供了大量内建函数,它的灵活性让数据工程师和数据科学家可以定义自己的函数。这些函数被称为用户自定义函数(user-defined function,UDF)。UDF分为两种类型:临时函数和永久函数。临时函数只在当前会话中有效,退出后重新连接就无法使用;永久函数则会将UDF信息注册到MetaStore元数据中,可以永久使用。
创建永久函数的步骤如下:
from pyspark.sql.functions import udf
from pes import StringTypedef upper_case(str):return str.upper()upper_case_udf = udf(upper_case, StringType())
复制
ister("upper_case", upper_case_udf)
复制
SELECT upper_case(name) FROM student;
UDF是一种可以在Spark SQL和DataFrame中使用的自定义函数,它可以对输入的数据进行一对一的转换或计算。
- 在编写udf时,要重写evaluate方法,用于对单行数据进行操作,并返回一个结果;
- 在编写udaf时,要重写new_buffer, iterate, merge, terminate四个方法,用于对多行数据进行聚合操作,并返回一个结果;
- 在编写udtf时,要重写process和close两个方法,用于对单行数据进行拆分操作,并返回多行数据;
# 导入必要的包
from pyspark.sql.functions import udf
from pes import LongType
from pyspark.sql import Window
from pyspark.sql.functions import collect_list# 定义一个 udf 函数,输入参数是开始时间和结束时间的数组,输出是总在线时长
@udf(LongType())
def totalOnlineTimeUDF(startTimes, endTimes):# 将字符串转换为时间戳startTimestamps = [int(x) for x in startTimes]endTimestamps = [int(x) for x in endTimes]# 对时间戳进行排序sortedStartTimestamps = sorted(startTimestamps)sortedEndTimestamps = sorted(endTimestamps)# 初始化变量totalOnlineTime = 0 # 总在线时长prevEndTime = 0 # 上一个结束时间# 遍历每个时间段for i in range(len(startTimestamps)):# 如果当前开始时间大于上一个结束时间,说明没有重叠,直接累加时长if sortedStartTimestamps[i] > prevEndTime:totalOnlineTime += sortedEndTimestamps[i] - sortedStartTimestamps[i]else:# 如果当前开始时间小于等于上一个结束时间,说明有重叠,需要判断当前结束时间是否大于上一个结束时间if sortedEndTimestamps[i] > prevEndTime:# 如果当前结束时间大于上一个结束时间,说明有部分时长没有计算,需要累加差值totalOnlineTime += sortedEndTimestamps[i] - prevEndTime# 更新上一个结束时间prevEndTime = sortedEndTimestamps[i]# 返回总在线时长return totalOnlineTime# 读取商品投放信息表
df = ad.table("product_info")# 使用窗口函数按商品分组,收集每个商品的所有开始时间和结束时间
df2 = df.withColumn("start_times", collect_list("activity_start_time").over(Window.partitionBy("sku_id"))).withColumn("end_times", collect_list("activity_end_time").over(Window.partitionBy("sku_id"))).dropDuplicates("sku_id") # 去重,保留每个商品的一行记录# 调用 udf 函数,计算每个商品的总在线时长
df3 = df2.withColumn("total_online_time", totalOnlineTimeUDF("start_times", "end_times"))# 选择需要的列,展示结果
df3.select("sku_id", "total_online_time").show()
Scala 是一门针对 JVM(Java 虚拟机)的静态类型编程语言,它结合了面向对象编程和函数式编程的特性。Scala 的设计目标是解决 Java 语言中的一些问题,并提供更简洁、灵活和可扩展的语言特性。Scala 在处理大规模数据时具有很强的表现力和高性能,并且与 Spark 框架紧密结合,成为 Spark 开发的首选语言。
tidb可以使用pyspark来进行数据分析和处理。pyspark是Spark的Python接口,可以利用Spark的分布式计算能力来处理大规模的数据。tidb已经集成了Spark框架,可以直接使用Spark连接tidb通过写SQL操作数据。tidb也提供了一个pytispark的包,可以在pyspark中使用TiContext类来访问tidb的数据库和表。 要在tidb中使用pyspark,您需要先安装pyspark和pytispark的包。然后,您可以在pyspark中创建一个SparkSession对象,并使用TiContext类来连接tidb的数据库和表。例如,以下是一个简单的代码示例,展示了如何在pyspark中读取tidb的一个表:
# 导入pyspark和pytisparkfrom pyspark.sql import SparkSessionfrom pytispark.pytispark import TiContext# 创建一个SparkSession对象spark = SparkSession.builder.appName("pyspark_tidb").getOrCreate()# 创建一个TiContext对象,连接tidb的数据库ti = TiContext(spark)ti.tidbMapDatabase("test")# 读取tidb的一个表,返回一个DataFrame对象df = spark.sql("select * from test.user")# 显示DataFrame的内容df.show()
要使用pyspark从tidb中读取数据并保存为excel文件,您可以参考以下步骤:
# 导入pyspark和pytisparkfrom pyspark.sql import SparkSessionfrom pytispark.pytispark import TiContext# 创建一个SparkSession对象spark = SparkSession.builder.appName("pyspark_tidb").getOrCreate()# 创建一个TiContext对象,连接tidb的数据库ti = TiContext(spark)ti.tidbMapDatabase("test")# 读取tidb的一个表,返回一个DataFrame对象df = spark.sql("select * from test.user")# 导入pandas和xlsxwriterimport pandas as pdimport xlsxwriter# 将pyspark的DataFrame转换为pandas的DataFramepdf = df.toPandas()# 将pandas的DataFrame保存为excel文件_excel("user.xlsx", engine="xlsxwriter")
TiContext类是pytispark包中的一个类,它可以用来在pyspark中访问tidb的数据库和表。pytispark是一个将TiSpark与PySpark结合的包,它可以让用户在pyspark中使用TiSpark的功能,例如使用Spark SQL来操作tidb的数据。TiContext类是pytispark的核心类,它提供了以下几个方法:
tidbMapDatabase(dbName)
:将tidb的数据库映射到Spark SQL的catalog中,可以使用spark.sql
或者spark.table
来访问tidb的表。tidbMapTable(dbName, tableName)
:将tidb的表映射到Spark SQL的catalog中,可以使用spark.sql
或者spark.table
来访问tidb的表。tidbTable(dbName, tableName)
:返回一个DataFrame对象,表示tidb的表。sql(query)
:在tidb中执行SQL语句,返回一个DataFrame对象,表示查询结果。要使用TiContext类,您需要先创建一个SparkSession对象,并传入一个TiConfiguration对象,用来配置tidb的地址和端口等信息。然后,您可以使用TiContext(spark)
来创建一个TiContext对象,并调用其方法来操作tidb的数据。例如,以下是一个简单的代码示例,展示了如何在pyspark中使用TiContext类:
# 导入pyspark和pytispark
from pyspark.sql import SparkSession
from pytispark.pytispark import TiContext# 创建一个TiConfiguration对象,配置tidb的地址和端口
from tikv_client import TiConfiguration
conf = TiConfiguration("127.0.0.1:2379")# 创建一个SparkSession对象,并传入TiConfiguration对象
spark = SparkSession.builder.appName("pyspark_tidb").config(conf).getOrCreate()# 创建一个TiContext对象
ti = TiContext(spark)# 将tidb的数据库映射到Spark SQL的catalog中
ti.tidbMapDatabase("test")# 使用spark.sql方法执行SQL语句,查询tidb的表
df = spark.sql("select * from test.user")# 显示DataFrame的内容
df.show()
SparkSession对象是Spark 2.0版本引入的一个新概念,它是Spark编程的统一入口,可以用来创建和操作DataFrame,注册和查询表,缓存表,读取parquet文件等。SparkSession对象可以替代SparkContext,SQLContext和HiveContext的功能,简化了Spark编程的复杂性。要创建一个SparkSession对象,可以使用以下的构建器模式:
# 导入pyspark.sql模块
from pyspark.sql import SparkSession# 使用builder方法创建一个SparkSession对象,并设置一些属性
spark = SparkSession.builder.master("local").appName("Word Count").config("fig.option", "some-value").getOrCreate()
上面的代码会创建一个名为Word Count的SparkSession对象,并设置了运行模式和一些配置选项。然后,可以通过spark对象来进行各种操作,例如:
# 从文件中读取数据,返回一个DataFrame对象
df = ("file:///")# 注册一个临时视图,可以用SQL语句来查询
df.createOrReplaceTempView("words")# 使用sql方法执行SQL语句,返回一个DataFrame对象
result = spark.sql("select word, count(*) as count from words group by word")# 显示结果
result.show()
TIDB数据库是一个开源的分布式关系型数据库,它支持在线事务处理(OLTP)和在线分析处理(OLAP)的混合场景,具有以下几个核心特点:
TIDB 是一个基于 Raft 协议的分布式数据库,它使用 Raft 来实现数据的复制和容灾。Raft 是一种分布式一致性算法,它可以保证在集群中的多个节点之间同步数据,并在节点发生故障时自动选举出新的领导者(leader)来继续提供服务。Raft 有以下几个优点:
TIDB 使用 Raft 协议比主从复制好,主要有以下几个原因:
Raft协议是一种分布式一致性算法,它可以保证在集群中的多个节点之间同步数据,并在节点发生故障时自动选举出新的领导者(leader)来继续提供服务。Raft协议的核心内容可以分为以下几个部分:
阿里的云数据库HybridDB是一种基于GreenPlum的分布式大规模并行处理(MPP)数据库,常用于大数据的存储引擎、计算引擎和分析引擎。它和其他的MPP数据库,例如clickhouse,有一些区别和优劣,主要有以下几点:
综上所述,阿里的云数据库HybridDB选用GreenPlum的必要性在于,GreenPlum是一个成熟稳定的MPP数据库,具有丰富的数仓特性和强大的生态系统,能够满足阿里的大数据分析和处理的需求,同时也能保证数据的一致性和可靠性,提供高效的服务。而clickhouse虽然在速度上有优势,但在功能和社区上有不足
减少IO:列式数据库只需要读取查询涉及的列,而不需要读取整张表的所有列2。这样可以减少磁盘的读取次数和数据量,提高IO的效率。例如,如果一张表有10列,每列占用1MB的空间,一个查询只需要访问其中的列,那么列式数据库只需要读取2MB的数据,而行式数据库需要读取10MB的数据。
提高压缩比:列式数据库由于每列的数据类型相同,且数据之间的相关性较高,可以采用更高效的压缩算法,如Snappy、Zlib、Zstd等,达到更高的压缩比。例如,如果一列的数据是字符串类型,且有很多重复的值,那么列式数据库可以用字典编码和位图编码等方式,将数据压缩成更小的空间,从而提高压缩比。
降低随机写入和更新的性能:列式数据库由于需要将一行的数据分散存储在不同的列中,对于随机写入和更新的操作,需要同时修改多个列的数据,这会增加磁盘的写入次数和数据量,降低IO的效率5。例如,如果一张表有10列,每列占用1MB的空间,一个更新操作需要修改其中的2列,那么列式数据库需要写入2MB的数据,而行式数据库只需要写入1MB的数据。
压缩可以节约磁盘的空间,基于文本的压缩率可达40%+; 压缩可以增加吞吐量和性能量(减小载入内存的数据量),但是在压缩和解压过程中会增加CPU的开销。所以针对IO密集型的jobs(非计算密集型)可以使用压缩的方式提高性能。 几种压缩算法:
压缩方式 | 压缩后大小 | 压缩速度 | 是否可以分割 |
---|---|---|---|
GZIP | 中 | 中 | 否 |
BZIP2 | 小 | 慢 | 是 |
LZO | 大 | 块 | 是 |
Snappy | 大 | 快 | 是 |
Hive数据表的默认格式,存储方式:行存储。 可以使用Gzip压缩算法,但压缩后的文件不支持split 在反序列化过程中,必须逐个字符判断是不是分隔符和行结束符,因此反序列化开销会比SequenceFile高几十倍。
Hadoop中有些原生压缩文件的缺点之一就是不支持分割。支持分割的文件可以并行 的有多个mapper程序处理大数据文件,大多数文件不支持可分割是因为这些文件只能从头开始读。Sequence File是可分割的文件格式,支持Hadoop的block级压缩。 Hadoop API提供的一种二进制文件,以key-value的形式序列化到文件中。存储方式:行存储。 sequencefile支持三种压缩选择:NONE,RECORD,BLOCK。Record压缩率低,RECORD是默认选项,通常BLOCK会带来较RECORD更好的压缩性能。 优势是文件和hadoop api中的MapFile是相互兼容的。
存储方式:数据按行分块,每块按列存储。结合了行存储和列存储的优点:
首先,RCFile 保证同一行的数据位于同一节点,因此元组重构的开销很低 其次,像列存储一样,RCFile 能够利用列维度的数据压缩,并且能跳过不必要的列读取 数据追加:RCFile不支持任意方式的数据写操作,仅提供一种追加接口,这是因为底层的 HDFS当前仅仅支持数据追加写文件尾部。 行组大小:行组变大有助于提高数据压缩的效率,但是可能会损害数据的读取性能,因为这样增加了 Lazy 解压性能的消耗。而且行组变大会占用更多的内存,这会影响并发执行的其他MR作业。
ORC是一种基于Hive开发的开源列式存储格式,它将数据按照条带(stripe)为单位进行划分和存储。每一个条带包含了若干条记录的所有属性或字段,并且可以对每个属性或字段进行不同的编码和压缩。ORC还提供了轻量级索引(light-weight index)来加速查询过滤。
Parquet是一种基于Thrift开发的开源列式存储格式,它将数据按照块(block)为单位进行划分和存储。每一个块包含了若干条记录的所有属性或字段,并且可以对每个属性或字段进行不同的编码和压缩。Parquet还支持复杂的嵌套数据结构,如数组、列表、映射等。
本文发布于:2024-01-28 09:38:00,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/17064058866497.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |