-- 可以将后面 namenode 换成 datanode,secondarynamenode,resourcemanager,nodemanager
[atguigu@hadoop112 ~]$ hdfs --daemon start namenode
[atguigu@hadoop112 ~]$ hdfs --daemon stop namenode
[atguigu@hadoop112 hadoop-3.1.3]$ ./sbin/stop-dfs.sh -- start
[atguigu@hadoop113 hadoop-3.1.3]$ ./sbin/start-yarn.sh -- stop或者已经配置过环境变量
start-dfs.sh
[atguigu@hadoop112 zookeeper-3.5.7]$ ./bin/zkServer.sh start --stop
[atguigu@hadoop113 zookeeper-3.5.7]$ ./bin/zkServer.sh start
[atguigu@hadoop114 zookeeper-3.5.7]$ ./bin/zkServer.sh start
1)启动客户端 [atguigu@hadoop112 zookeeper-3.5.7]$ ./bin/zkCli.sh
2)显示所有操作命令 [zk: localhost:2181(CONNECTED) 1] help
3)查看当前znode中所包含的内容[zk: localhost:2181(CONNECTED) 2] ls /
4)查看当前节点详细数据[zk: localhost:2181(CONNECTED) 1] ls -s /
5)分别创建2个普通节点[zk: localhost:2181(CONNECTED) 2] create /sanguo "diaochan"[zk: localhost:2181(CONNECTED) 3] create /sanguo/shuguo "liubei"
6)获得节点的值[zk: localhost:2181(CONNECTED) 7] get /sanguo[zk: localhost:2181(CONNECTED) 8] get -s /sanguo
7)创建临时节点create -e /sanguo/wuguo "zhaoyun"
8)创建带序号的节点 (先创建普通节点,再创建带序号节点)[zk: localhost:2181(CONNECTED) 1] create /sanguo/weiguo "caocao"[zk: localhost:2181(CONNECTED) 3] create -s /sanguo/weiguo "caocao"
9)修改节点数据值[zk: localhost:2181(CONNECTED) 5] set /sanguo/weiguo "caopi"
10)节点的值变化监听 注意:为值变化 在两个hadoop112窗口上执行[zk: localhost:2181(CONNECTED) 9] get -w /sanguo [zk: localhost:2181(CONNECTED) 2] set /sanguo "xishi"
11)节点的子节点变化监听(路径变化)[zk: localhost:2181(CONNECTED) 3] ls -w /sanguo[zk: localhost:2181(CONNECTED) 10] create /sanguo/wangwangguo
12)删除节点[zk: localhost:2181(CONNECTED) 12] delete /sanguo/wangwangguo
13)递归删除节点[zk: localhost:2181(CONNECTED) 14] deleteall /sanguo
14)查看节点状态[zk: localhost:2181(CONNECTED) 18] stat /sanguo
总结:
-- sources配置项:
netcat 从端口连接采集数据.
Exec 针对文件内容采集数据(一般采集日志)
Spooldir 针对目录内文件进行采集(可对文件内采集过数据进行标记)
TAILDIR 可对文件下目录作不同文件名的监控
Avro 一般上个阶段为agent时,用avro采集
kafka 传来为kafka的数据-- Sinks 配置项:
logger 将数据存放到控制台
Hdfs 将数据存储在hdfs上
Avro 一般下个阶段又agent时,用avro传递给下个阶段.
kafka 将kafka的数据输出-- Channels配置项
Memory 一般传送
Replicating(复制)
Multiplexing(多路复用,不同文件传入不同sink中)
kafka 传送为kafka数据
file 将channel数据写入磁盘,而不是保留在内存,更安全.
拦截器1是为了过滤掉脏数据,拦截器2是为了让同一天数据方进同一个文档,根据header的tm设置时间
(注意:需要在112上和113上一起采集数据,因此112和113都需要启动f1)
#Named
a1.sources = r1
a1.channels = c1 #Source
a1.pe = TAILDIR
#创建position.json文件为断点续传
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*#Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.pe = com.ck.ETLLogDataInterceptor.ETLLogDataInterceptor$MyBuilder#Channel
a1.pe = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop112:9092,hadoop113:9092
a1.channels.pic = topic_log
#是否要将其解析为flume的event
a1.channels.c1.parseAsFlumeEvent = false#Bind
a1.sources.r1.channels = c1#运行
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/f -n a1 -logger=INFO,console
f1脚本
#!/bin/bash
if [ $# -lt 1 ]
then
echo "请输入参数 {start|stop}"
exit
fifor i in hadoop112 hadoop113
do
case $1 in
start)
ssh $i "nohup flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/f -n a1 -logger=INFO,console 1>/opt/module/flume-1.9.0/logs/flume.log 2>&1 &"
;;
stop)
# 排除带grep的字符串行 xargs 将前面计算作为kill -9 的参数
ssh $i " ps -ef | f | grep -v grep | awk '{print $2}' | xargs -n1 kill -9"
;;
*)
echo "参数错误,请输入参数 {start|stop}"
exit
;;
esac
done
(注意在114上建立)
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1#Source 注意kafkasource会给我们封装even的header的timestamp属性时间,不使用系统时间
#因为上传也消耗时间,所以,可能会产生误差,而使用数据本身ts时间,则能实现精准上传
a1.pe = org.apache.flume.source.kafka.KafkaSource
#只是为了防止112的kafka故障,从而能从113进入kafka
a1.sources.r1.kafka.bootstrap.servers = hadoop112:9092,hadoop113:9092
a1.sources.up.id = gmall
a1.sources.pics = topic_log
a1.sources.r1.batchSize = 1000
#若batchsize没有装满,则最大等10s传送
a1.sources.r1.batchDurationMillis = 1000
a1.sources.r1.useFlumeEventFormat = false #Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.pe = com.ck.LogHeaderTimeStampInterceptor.LogHeaderTimeStampInterceptor$MyBuilder#Channel
a1.pe = file
#将source传来的数据写入filechannel
a1.channels.c1.dataDirs = /opt/module/flume-1.9.0/jobs/filechannel
#维护被sink拿走的数据(也就是被sink拿走后,数据被标记,这些标记存在checkpoint)
a1.channels.c1.checkpointDir = /opt/module/flume-1.9.0/jobs/checkpoint
#Checkpoints是否备份维护数据(checkpoint),若未true,则backupCheckpointDir为备份路径在otherdisk里面
#a1.channels.c1.useDualCheckpoints= true
#a1.channels.c1.backupCheckpointDir = /otherdisk
#若source往channel放不下数据适合,会发生回滚事务(把source放进去的数据也抛弃,重新source该批数据).
#接上面,设置5s则为在5s内一直尝试放入,而不发生回滚,当超过5s再发生回滚
a1.channels.c1.keep-alive = 5 #Sink
a1.pe = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = false
a1.sinks.k1.hdfs.filePrefix = log-
#是否按照时间滚动文件夹
a1.sinks.und = false
#多久生成一个新的文件 (若10s内未上传完毕,则hdfs创建一个新文件夹收集继续上传内容)
a1.sinks.llInterval = 10
#设置每个文件的滚动大小
a1.sinks.llSize = 134217728
#文件的滚动与Event数量无关
a1.sinks.llCount = 0
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.deC = lzop#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1 #运行
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/f -n a1 -logger=INFO,console
f2脚本
#!/bin/bash
if [ $# -lt 1 ]
then
echo "请输入参数 {start|stop}"
exit
fifor i in hadoop114
do
case $1 in
start)
ssh $i "nohup flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/f -n a1 -logger=INFO,console 1>/opt/module/flume-1.9.0/logs/flume.log 2>&1 &"
;;
stop)
ssh $i " ps -ef | f | grep -v grep | awk '{print $2}' | xargs -n1 kill -9"
;;
*)
echo "参数错误,请输入参数 {start|stop}"
exit
;;
esac
done
-- 启动
[atguigu@hadoop112 kafka_2.11-2.4.1]$ kafka-server-start.sh -daemon config/server.properties-- 停止
[atguigu@hadoop112 kafka_2.11-2.4.1]$ kafka-server-stop.sh
1)查看所有的Topic
[atguigu@hadoop112 kafka_2.11-2.4.1]$kafka-topics.sh --list --bootstrap-server hadoop112:90922)创建Topic topic 本次创建topic名称 partitions分区个数 replication-factor副本个数 3
[atguigu@hadoop112 kafka_2.11-2.4.1]$ kafka-topics.sh --create --bootstrap-server hadoop112:9092 --topic second --partitions 2 --replication-factor 33) 查看topic的详情
[atguigu@hadoop112 kafka_2.11-2.4.1]$ kafka-topics.sh --describe --bootstrap-server hadoop112:9092 --topic second4) 修改Topic 注意:只能增加分区不能减少分区
[atguigu@hadoop112 kafka_2.11-2.4.1]$ kafka-topics.sh --alter --bootstrap-server hadoop112:9092 --topic second --partitions 35) 删除Topic
[atguigu@hadoop112 kafka_2.11-2.4.1]$ kafka-topics.sh --delete --bootstrap-server hadoop112:9092 --topic third
注意
– 查看topic的详情时
Topic: second PartitionCount: 2 ReplicationFactor: 3 Configs: segment.bytes=1073741824Topic: second Partition: 0 Leader: 3 Replicas: 3,2,4 Isr: 3,2,4Topic: second Partition: 1 Leader: 4 Replicas: 4,3,2 Isr: 4,3,2Topic: 名字 PartitionCount: 分区个数 ReplicationFactor:副本数
Topic: 名字 Partition: 分区序号 Leader: L在那个broker Replicas: R在那几个broker
Isr:活着的R在那几个broker上
1) 生产者
[atguigu@hadoop112 kafka_2.11-2.4.1]$ kafka-console-producer.sh --broker-list hadoop112:9092 -topic second2) 消费者(broker内分区无序,但分区内数据有序)
[atguigu@hadoop112 kafka_2.11-2.4.1]$ kafka-console-consumer.sh --bootstrap-server hadoop112:9092 -topic second
-- 新组从头开始读取数据
[atguigu@hadoop112 kafka_2.11-2.4.1]$ kafka-console-consumer.sh --bootstrap-server hadoop112:9092 -topic second --from-beginning3)消费者组(offset 的 g[组]-t[主题]-p[分区])
-- 在config目录下修改配置文件consumer.properties内的group.id=g112,并在创建时候指定该配置文件
[atguigu@hadoop112 kafka_2.11-2.4.1]$ kafka-console-consumer.sh --bootstrap-server hadoop112:9092 -topic second --group g11
消费者注意
如果默认创建消费者:而不指定组,则默认分配组,每次默认创建的组都会不同kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic second
注意:若同一组的消费者数超过分区数,则会自动挤掉至和分区相同的消费者.
本文发布于:2024-01-29 03:36:36,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170647060112419.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |