纯干货,一步一步完成MySQL到hive全部详细过程
博主大数据集群:CDH6.3.2
利用阿里开源项目canal+Linkedin 的开源项目 Camus
项目地址:
说明:本文更新时canal发行版为1.1.6
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
OS:CentOS7
jdk:jdk11【因为涉及授权问题,生产环境可以考虑openjdk】
MySQL:用于存储配置和节点等相关数据
zookeeper
kafka【可选,根据canal官方支持的mq自行配置,此处依据我们现状选择kafka】
关于以上环境依赖,请自行配置,本文默认已有相关环境,不做赘述。
版本地址:
版主使用的是v1.1.6
考虑到集群的搭建及运维高效性,因此选择admin+server的搭建方式
1.将下载好的canal.admin-1.1.上传至服务器适当目录下【此处我是测试环境,因此直接使用root,生产环境建议使用专用账户】
[root@py3build ~]# mkdir -p /opt/canal/admin[root@py3build ~]# tar -zxvf canal.admin-1.1. -C /opt/canal/admin[root@py3build ~]# cd /opt/canal/admin[root@py3build admin]# ll
total 8
drwxr-xr-x. 2 root root 93 Sep 21 13:53 bin
drwxr-xr-x. 3 root root 156 Sep 21 13:03 conf
drwxr-xr-x. 2 root root 4096 Sep 21 13:03 lib
drwxrwxrwx. 2 root root 23 Sep 21 13:03 logs
2.修改配置
[root@py3build admin]# vim lserver:port: 8089
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8spring.datasource:address: 192.168.1.111:3306 #用于canal存储配置和节点等相关数据database: canal_managerusername: canalpassword: canaldriver-class-name: sql.jdbc.Driverurl: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=falsehikari:maximum-pool-size: 30minimum-idle: 1canal:adminUser: adminadminPasswd: admin
配置中的adminUser/adminPasswd是用来和canal-server做链接校验的,后边配置canal-server时会做说明
3.初始化元数据库
[root@py3build admin]# mysql -h192.168.1.111 -uroot -pmysql>source conf/canal_manager.sql
初始化SQL脚本里会默认创建canal_manager的数据库,建议使用root等有超级权限的账号进行初始化
canal_manager.sql默认会在conf目录下,也可以通过链接下载 canal_manager.sql
如果本机没有mysql,需要安装一个mysql客户端
我的mysql是5.7的,所以安装client方式如下:
[root@py3build admin]# rpm -ivh //arch.rpm [root@py3build admin]# yum search mysql [root@py3build admin]# yum install mysql-community-client.x86_64 -y [root@py3build admin]# [root@py3build admin]#
如果报错:
mysql-community-libs-compat-5.7.39-1.el7.x86_64.rpm 的公钥尚未安装失败的软件包是:mysql-community-libs-compat-5.7.39-1.el7.x86_64 GPG 密钥配置为:file:///etc/pki/rpm-gpg/RPM-GPG-KEY-mysql
执行一下
root@py3build admin]# rpm --import [root@py3build admin]# yum install mysql-community-client.x86_64 -y
4.创建MySQL用户
初始化数据库并没有初见默认链接的用户,我们可以根据自己的需要创建一个用户,对canal_manager有读写权限,对应配置文件中的username和password
5.启动
[root@py3build admin]# sh bin/startup.sh
查看admin日志
[root@py3build admin]# tail logs/admin.log
......
2022-09-21 13:53:50.600 [main] INFO http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8089"]
2022-09-21 13:53:50.603 [main] INFO at.util.NioSelectorPool - Using a shared selector for servlet write/read
2022-09-21 13:53:50.613 [main] INFO o.s.at.TomcatWebServer - Tomcat started on port(s): 8089 (http) with context path ''
2022-09-21 13:53:50.616 [main] INFO canal.admin.CanalAdminApplication - Started CanalAdminApplication in 3.329 seconds (JVM running for 3.997)
2022-09-21 13:53:51.399 [http-nio-8089-exec-1] INFO o.ContainerBase.[Tomcat].[localhost].[/] - Initializing Spring FrameworkServlet 'dispatcherServlet'
2022-09-21 13:53:51.400 [http-nio-8089-exec-1] INFO org.springframework.web.servlet.DispatcherServlet - FrameworkServlet 'dispatcherServlet': initialization started
2022-09-21 13:53:51.408 [http-nio-8089-exec-1] INFO org.springframework.web.servlet.DispatcherServlet - FrameworkServlet 'dispatcherServlet': initialization completed in 8 ms
此时代表canal-admin已经启动成功,可以通过 192.168.1.11:8089 访问,默认密码:admin/123456
6.创建集群
(1)canal.zkServers =
(2)kafka.bootstrap.servers =
(3)canal.instance.l = classpath:l
同时因为没有启用kerberos,所以把kafka.kerberos.*进行了注释【可选,不注释掉会报WARN】
#able = false
#kafka.kerberos.krb5.file = “…/conf/f”
#kafka.kerberos.jaas.file = “…/conf/f”
其他参数大家根据自己的需要酌情修改,修改完后点击保存
1.将下载好的canal.deployer-1.1.上传至服务器适当目录下【此处我是测试环境,因此直接使用root,生产环境建议使用专用账户】
[root@py3build ~]# mkdir -p /opt/canal/deployer[root@py3build ~]# tar -zxvf canal.deployer-1.1. -C /opt/canal/deployer[root@py3build ~]# cd /opt/canal/deployer[root@py3build deployer]# ll
total 4
drwxr-xr-x. 2 root root 93 Sep 21 13:56 bin
drwxr-xr-x. 6 root root 149 Sep 21 13:56 conf
drwxr-xr-x. 2 root root 4096 Sep 21 11:30 lib
drwxrwxrwx. 4 root root 45 Sep 21 13:56 logs
drwxrwxrwx. 2 root root 235 Aug 11 10:52 plugin
2.修改配置
[root@py3build deployer]# vim conf/canal_local.properties # register ip
ister.ip =# canal admin config
canal.admin.manager = 192.168.1.11:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
ister.auto = true
ister.cluster = test
ister.name = mysql-02
针对canal.admin.passwd,这里默认做了密码加密处理,这里的passwd是一个密文,和canal-admin里l里的密码原文做对应.
密文的生成方式,请登录mysql,执行如下密文生成sql即可(记得去掉第一个首字母的星号)
select password('admin')+-------------------------------------------+
| password('admin') |
+-------------------------------------------+
| *4ACFE3202A5FF5CF467898FC58AAB1D615029441 |
+-------------------------------------------+# 如果遇到mysql8.0,可以使用select upper(sha1(unhex(sha1('admin'))))
请注意几点:
(1)这个密码方式,同样对于canal.user.passwd有效 (1.1.4新增的,用于控制用户访问canal-server的订阅binlog的ACL机制)
(2)canal.admin.user/canal.admin.passwd,这是一个双向认证,canal-server会以这个密文和canal-admin做请求,同时canal-admin也会以密码原文生成加密串后和canal-server进行admin端口链接,所以这里一定要确保这两个密码内容的一致性
canal admin 的 l 里面定义了账号密码明文 canal.adminUser:canal.adminPasswd
canal server 的 conf/canal_local.properties 里面定义了账号密码密文 canal.admin.user:canal.admin.passwd
双向认证: canal server 向 canal admin 注册的时候会以密码密文做认证, canal admin 对 canal server 做连通性测试的时候也会将密码明文加密之后做认证 (连通性测试失败的时候,canal admin web 会显示对应的 canal server 处于 “断开” 状态)
2.ister.auto:自动注册的意思,如果没有配置,canal-server 启动后需要自行在 canal-server 上面添加
3.ister.cluster:这个配置如果不写代表当前的 canal-server 是一个单机节点,如果添加的名字在 canal-admin 上面没有提前注册,canal-server 启动时会报错
4.ister.name:注册到 canal admin 上server的名字,唯一有意义即可
3.启动
目前conf下会包含canal.properties/canal_local.properties两个文件,考虑历史版本兼容性,默认配置会以canal.properties为主,如果要启动为对接canal-admin模式,可以有两种方式
sh bin/startup.sh local
启动后,我们可以在admin的web界面上看到已经注册的server
使用集群模式注册的server,你会发现点击配置会提示【集群模式Server不允许单独变更配置,请在集群配置变更】
说明集群模式下的server不允许单独修改配置,所有配置统一走集群的主配置。
按照以上步骤可以部署多台server以保证集群的高可用。
在上面canal-server和canal-admin都配置完之后,我们就可以创建对应的实例进行数据的操作
1.点击instance管理–>新建Instance–>载入模板
主要需要修改的配置点有【官方原文】:
(1)canal.instance.master.address:需要采集的mysql主库
(2)canal.instance.dbUsername:采集数据库的用户名,给予此用户slave相关权限
(3)canal.instance.dbPassword:采集数数据库密码
(4)canal.:要采集的库表,支持正则表达式
(5)pic:推送的kafka的topic名称
(6)canal.mq.dynamicTopic:针对库名或者表名发送动态topic,支持正则表达式
canal.mq.dynamicTopic 表达式说明
canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多个配置之间使用逗号或分号分隔
例子1:test\.test 指定匹配的单表,发送到以test_test为名字的topic上
例子2:.*\..* 匹配所有表,则每个表都会发送到各自表名的topic上
例子3:test 指定匹配对应的库,一个库的所有表都会发送到库名的topic上
例子4:test\..* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上
例子5:test,test1\.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1\.test1的表发送到对应的test1_test1 topic上,其余的表发送到默认的pic值
为满足更大的灵活性,允许对匹配条件的规则指定发送的topic名字,配置格式:topicName:schema 或 topicName:schema.table
例子1: test:test\.test 指定匹配的单表,发送到以test为名字的topic上
例子2: test:.*\..* 匹配所有表,因为有指定topic,则每个表都会发送到test的topic下
例子3: test:test 指定匹配对应的库,一个库的所有表都会发送到test的topic下
例子4:testA:test\..* 指定匹配的表达式,针对匹配的表会发送到testA的topic下
例子5:test0:test,test1:test1\.test1,指定多个表达式,会将test库的表都发送到test0的topic下,test1\.test1的表发送到对应的test1的topic下,其余的表发送到默认的pic值
大家可以结合自己的业务需求,设置匹配规则,建议MQ开启自动创建topic的能力
正常情况下我们一般选择使用canal.mq.dynamicTopic,而将pic注释掉
之后填写Instance名称,并选择相应集群,点击保存
2.启动
在操作中点击启动,状态变更为【启动】后。点击操作–>日志,刷新查看日志是否有报错
我们集群中有两台服务器,分别是mysql-01、mysql-02
我们可以看到当前instence挂载的所属主机是mysql-01
我们手动停止mysql-01
查看instance,也停了
然后再刷新一下,发现instance自动启动并挂载到mysql-02下了
项目地址:
项目说明:原版Camus是linkedIn开源的,但是后来合并到gobblin中了,gobblin功能相对强大一些,而我们因为只需要从kafka写hdfs这一个功能,所以就选择了Camus,这个版本是Confluent维护的镜像版本,虽然现在也不更新了,但是维护的时间比原版长。
Camus的简介大家去看GitHub的官方介绍吧,毕竟全英文,没那么高的水平全翻译过来
1.从GitHub上获取源代码,pom里的版本没把握的话(需要仔细研究源代码)就不用动了,我的CDH6.3.2集群使用正常
2.自定义binlog落地方式【可选】
因为canal写入kafka的binlog比较复杂,可能并不完全是我们希望的数据格式,因此可以进行一下二次开发,在HDFS落盘的时候直接写我们希望的格式,以下代码来自网络,因本人是运维工程师,不擅长java,因此有擅长java的兄弟可以自行开发或改编
将Camus源码clone到本地后,在com.l.kafkamon下新建一个自定义的CanalBinlogRecordWriterProvider,代码如下
源代码如下:
package com.l.kafkamon;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.ders.CamusWrapper;
import com.l.IEtlKey;
import com.l.RecordWriterProvider;
import com.l.kafka.mapred.EtlMultiOutputFormat;
slf4j.Slf4j;
import org.f.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.iopress.CompressionCodec;
import org.apache.hadoop.iopress.DefaultCodec;
import org.apache.hadoop.iopress.GzipCodec;
import org.apache.hadoop.iopress.SnappyCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Map;@Slf4j
public class CanalBinlogRecordWriterProvider implements RecordWriterProvider {protected String recordDelimiter = null;public static final String ETL_OUTPUT_RECORD_DELIMITER = "d.delimiter";public static final String DEFAULT_RECORD_DELIMITER = "n";private boolean isCompressed = false;private CompressionCodec codec = null;private String extension = "";public CanalBinlogRecordWriterProvider(TaskAttemptContext context) {Configuration conf = Configuration();if (recordDelimiter == null) {recordDelimiter = (ETL_OUTPUT_RECORD_DELIMITER, DEFAULT_RECORD_DELIMITER);}isCompressed = CompressOutput(context);if (isCompressed) {Class<? extends CompressionCodec> codecClass = null;if ("snappy".EtlOutputCodec(context))) {codecClass = SnappyCodec.class;} else if ("gzip".equals((EtlOutputCodec(context)))) {codecClass = GzipCodec.class;} else {codecClass = DefaultCodec.class;}codec = wInstance(codecClass, conf);extension = DefaultExtension();}}static class CanalBinlogRecordWriter extends RecordWriter<IEtlKey, CamusWrapper> {private DataOutputStream outputStream;private String fieldDelimiter;private String rowDelimiter;public CanalBinlogRecordWriter(DataOutputStream outputStream, String fieldDelimiter, String rowDelimiter) {this.outputStream = outputStream;this.fieldDelimiter = wDelimiter = rowDelimiter;}@Overridepublic void write(IEtlKey key, CamusWrapper value) throws IOException, InterruptedException {log.info("IEtlKey key:"String()+" CamusWrapper value: " + String());if (value == null) {return;}String recordStr = (String) Record();JSONObject record = JSON.parseObject(recordStr, Feature.OrderedField);if (String("isDdl").equals("true")) {return;}log.info("record:" + JSONString());JSONArray data = JSONArray("data");if (data != null && data.size() > 0){for (int i = 0; i < data.size(); i++) {JSONObject obj = JSONObject(i);if (obj != null) {StringBuilder fieldsBuilder = new StringBuilder();fieldsBuilder.Long("id"));fieldsBuilder.append(fieldDelimiter);fieldsBuilder.Long("es"));fieldsBuilder.append(fieldDelimiter);fieldsBuilder.Long("ts"));fieldsBuilder.append(fieldDelimiter);fieldsBuilder.String("type"));for (Map.Entry<String, Object> entry : Set()) {fieldsBuilder.append(fieldDelimiter);fieldsBuilder.Value());}fieldsBuilder.append(rowDelimiter);outputStream.String().getBytes());log.info(String()" + String());}}}}@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {outputStream.close();}}@Overridepublic String getFilenameExtension() {return "";}@Overridepublic RecordWriter<IEtlKey, CamusWrapper> getDataRecordWriter(TaskAttemptContext context,String fileName,CamusWrapper data,FileOutputCommitter committer) throws IOException, InterruptedException {Configuration conf = Configuration();String rowDelimiter = ("d.delimiter", "n");Path path = new WorkPath(), UniqueFile(context, fileName, getFilenameExtension()));FileSystem fs = FileSystem(conf);FSDataOutputStream outputStream = fs.create(path, false);return new CanalBinlogRecordWriter(outputStream, "t", rowDelimiter);}
}
说给不太懂java的兄弟:
在camus-etl-kafka下的l文件中添加如下内容
<!-- .alibaba/fastjson -->
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.51</version>
</dependency>
之后别忘了点击一下【加载maven变更】
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency>
3.执行编译命令
mvn clean package -DskipTests
记住,在没有hadoop环境中编译一定要加上-DskipTests,否则单元测试失败你编译不通过
正常情况下只要你的maven仓库拉取没问题,一般能编译成功
4.将整个camus文件夹全部打包压缩,上传服务器
记住,是整个项目打包,因为我不太懂java,所以就这么干了,要是有东java的兄弟也可以指教一下怎么操作。
5.配置部署
上传至服务器后,解压缩,然后将camus/camus-example/src/main/resource/ 目录下的两个文件【l】、【camus.properties】复制到camus/bin/ 目录下
6.修改配置
因为也是初次使用,所以这里放一个网上整理的
# Kafka brokers kafka的ip
kafka.brokersxx:xxx:xxx:9092
# job名称
camus.job.name=binlog-fetch
# Kafka数据落地到HDFS的位置。Camus会按照topic名自动创建子目录
etl.destination.path=/usr/local/camus/exec/topic
# HDFS上用来保存当前Camus job执行信息的位置,如offset、错误日志等
# base.path是基础路径,其它路径要在base.path之下
ution.base.path=/usr/local/camus/exec
# HDFS上保存Camus job执行历史的位置
ution.history.path=/usr/local/camus/exec/history
# 即l中的fs.defaultFS参数
fs.default.name=hdfs://hadoop-master:9000
# Kafka消息解码器,默认有JsonStringMessageDecoder和KafkaAvroMessageDecoder
# Canal的Binlog是JSON格式的。当然我们也可以自定义解码器
ssage.decoder.class=com.ders.JsonStringMessageDecoder
# 落地到HDFS时的写入器,默认支持Avro、SequenceFile和字符串
# 这里我们采用一个自定义的WriterProvider,代码在后面
# d.writer.provider.class=com.ders.JsonStringMessageDecoder
d.writer.provider.class=com.l.kafkamon.CanalBinlogRecordWriterProvider
# JSON消息中的时间戳字段,用来做分区的
# 注意这里采用Binlog的业务时间,而不是日志时间
ssage.timestamp.field=es
# 时间戳字段的格式
ssage.timestamp.format=unix_milliseconds
# 时间分区的类型和格式,默认支持小时、天,也可以自定义时间
etl.partitioner.class=com.l.kafka.partitioner.TimeBasedPartitioner
etl.pic.sub.dirformat='pt_hour'=YYYYMMddHH
# 拉取过程中MR job的mapper数
mapred.map.tasks=20
# 按照时间戳字段,一次性拉取多少个小时的数据过后就停止,-1为不限制
kafka.max.pull.hrs=-1
# 时间戳早于多少天的数据会被抛弃而不入库
kafka.max.historical.days=3
# 每个mapper的最长执行分钟数,-1为不限制
kafka.max.pull.minutes.per.task=-1
# Kafka topic白名单和黑名单,白名单必填
pics=oms_orders_orders,oms_orders_order_detail
pics=
kafka.client.name=camus
# 设定输出数据的压缩方式,支持deflate、gzip和snappy
mapred.outputpress=false
# dec=gzip
# etl.deflate.level=6
# 设定时区,以及一个时间分区的单位
etl.default.timezone=Asia/Shanghai
etl.output.file.time.partition.mins=60
7.运行
在camus/bin目录下执行:
./camus-run -P camus.properties
示例:
kafka中的数据:
落到hdfs上的数据
只需要将数据load到相应的hive表中,就可以直接处理了
调度任务各个公司根据自己的需要进行调整就行,crontab、oozie、airflow等等,在此不做赘述
本文发布于:2024-02-04 21:07:11,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170716421959615.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |