hadoop、zk、kafka命令集合

阅读: 评论:0

hadoop、zk、kafka命令集合

hadoop、zk、kafka命令集合

1 Hadoop命令

1.1点单启动和关闭

-- 可以将后面 namenode 换成 datanode,secondarynamenode,resourcemanager,nodemanager	
[atguigu@hadoop112 ~]$ hdfs --daemon start namenode
[atguigu@hadoop112 ~]$ hdfs --daemon stop namenode

1.2群起集群

[atguigu@hadoop112 hadoop-3.1.3]$ ./sbin/stop-dfs.sh        -- start
[atguigu@hadoop113 hadoop-3.1.3]$ ./sbin/start-yarn.sh      -- stop或者已经配置过环境变量
start-dfs.sh

2 Zookeeper命令

2.1 点单启动zookeeper

[atguigu@hadoop112 zookeeper-3.5.7]$ ./bin/zkServer.sh start	--stop
[atguigu@hadoop113 zookeeper-3.5.7]$ ./bin/zkServer.sh start	
[atguigu@hadoop114 zookeeper-3.5.7]$ ./bin/zkServer.sh start

2.2 zookeeper命令

1)启动客户端			 [atguigu@hadoop112 zookeeper-3.5.7]$ ./bin/zkCli.sh 
2)显示所有操作命令		[zk: localhost:2181(CONNECTED) 1] help
3)查看当前znode中所包含的内容[zk: localhost:2181(CONNECTED) 2] ls /
4)查看当前节点详细数据[zk: localhost:2181(CONNECTED) 1] ls -s /
5)分别创建2个普通节点[zk: localhost:2181(CONNECTED) 2] create /sanguo "diaochan"[zk: localhost:2181(CONNECTED) 3] create /sanguo/shuguo "liubei"
6)获得节点的值[zk: localhost:2181(CONNECTED) 7] get /sanguo[zk: localhost:2181(CONNECTED) 8] get -s /sanguo
7)创建临时节点create -e /sanguo/wuguo "zhaoyun"
8)创建带序号的节点  (先创建普通节点,再创建带序号节点)[zk: localhost:2181(CONNECTED) 1] create /sanguo/weiguo "caocao"[zk: localhost:2181(CONNECTED) 3] create -s /sanguo/weiguo "caocao"
9)修改节点数据值[zk: localhost:2181(CONNECTED) 5] set /sanguo/weiguo "caopi"
10)节点的值变化监听 注意:为值变化  在两个hadoop112窗口上执行[zk: localhost:2181(CONNECTED) 9] get -w /sanguo          [zk: localhost:2181(CONNECTED) 2] set /sanguo "xishi"	
11)节点的子节点变化监听(路径变化)[zk: localhost:2181(CONNECTED) 3] ls -w /sanguo[zk: localhost:2181(CONNECTED) 10] create /sanguo/wangwangguo
12)删除节点[zk: localhost:2181(CONNECTED) 12] delete /sanguo/wangwangguo
13)递归删除节点[zk: localhost:2181(CONNECTED) 14] deleteall /sanguo
14)查看节点状态[zk: localhost:2181(CONNECTED) 18] stat /sanguo

2.3 flume配置和命令

总结:

-- sources配置项:
netcat	从端口连接采集数据.
Exec   针对文件内容采集数据(一般采集日志)
Spooldir  针对目录内文件进行采集(可对文件内采集过数据进行标记)
TAILDIR   可对文件下目录作不同文件名的监控
Avro	 一般上个阶段为agent时,用avro采集
kafka 	  传来为kafka的数据-- Sinks 配置项:
logger	将数据存放到控制台
Hdfs  	将数据存储在hdfs上
Avro	一般下个阶段又agent时,用avro传递给下个阶段.
kafka 	将kafka的数据输出-- Channels配置项
Memory 一般传送
Replicating(复制)
Multiplexing(多路复用,不同文件传入不同sink中)
kafka 		传送为kafka数据
file 		将channel数据写入磁盘,而不是保留在内存,更安全.

拦截器1是为了过滤掉脏数据,拦截器2是为了让同一天数据方进同一个文档,根据header的tm设置时间

第一层flume

(注意:需要在112上和113上一起采集数据,因此112和113都需要启动f1)

#Named
a1.sources = r1 
a1.channels = c1 #Source
a1.pe = TAILDIR
#创建position.json文件为断点续传
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*#Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.pe = com.ck.ETLLogDataInterceptor.ETLLogDataInterceptor$MyBuilder#Channel
a1.pe = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop112:9092,hadoop113:9092
a1.channels.pic = topic_log
#是否要将其解析为flume的event
a1.channels.c1.parseAsFlumeEvent = false#Bind  
a1.sources.r1.channels = c1#运行
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/f -n a1 -logger=INFO,console

f1脚本

#!/bin/bash
if [ $# -lt 1 ]
then
echo "请输入参数	 {start|stop}"
exit
fifor i in hadoop112 hadoop113
do
case $1 in 
start)
ssh $i "nohup flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/f -n a1 -logger=INFO,console 1>/opt/module/flume-1.9.0/logs/flume.log  2>&1 &"
;;
stop)
#                                排除带grep的字符串行        xargs 将前面计算作为kill -9 的参数
ssh $i " ps -ef | f | grep -v grep | awk '{print $2}' | xargs -n1 kill -9"
;;
*)
echo "参数错误,请输入参数	 {start|stop}"
exit
;;
esac
done
第二层flume

(注意在114上建立)

#Named
a1.sources = r1
a1.channels = c1 
a1.sinks = k1#Source   注意kafkasource会给我们封装even的header的timestamp属性时间,不使用系统时间
#因为上传也消耗时间,所以,可能会产生误差,而使用数据本身ts时间,则能实现精准上传
a1.pe = org.apache.flume.source.kafka.KafkaSource
#只是为了防止112的kafka故障,从而能从113进入kafka
a1.sources.r1.kafka.bootstrap.servers = hadoop112:9092,hadoop113:9092
a1.sources.up.id = gmall
a1.sources.pics = topic_log
a1.sources.r1.batchSize = 1000
#若batchsize没有装满,则最大等10s传送
a1.sources.r1.batchDurationMillis = 1000
a1.sources.r1.useFlumeEventFormat = false #Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.pe = com.ck.LogHeaderTimeStampInterceptor.LogHeaderTimeStampInterceptor$MyBuilder#Channel 
a1.pe = file
#将source传来的数据写入filechannel
a1.channels.c1.dataDirs = /opt/module/flume-1.9.0/jobs/filechannel
#维护被sink拿走的数据(也就是被sink拿走后,数据被标记,这些标记存在checkpoint)
a1.channels.c1.checkpointDir = /opt/module/flume-1.9.0/jobs/checkpoint
#Checkpoints是否备份维护数据(checkpoint),若未true,则backupCheckpointDir为备份路径在otherdisk里面
#a1.channels.c1.useDualCheckpoints= true 
#a1.channels.c1.backupCheckpointDir = /otherdisk 
#若source往channel放不下数据适合,会发生回滚事务(把source放进去的数据也抛弃,重新source该批数据).
#接上面,设置5s则为在5s内一直尝试放入,而不发生回滚,当超过5s再发生回滚
a1.channels.c1.keep-alive = 5 #Sink 
a1.pe = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = false 
a1.sinks.k1.hdfs.filePrefix = log-
#是否按照时间滚动文件夹
a1.sinks.und = false
#多久生成一个新的文件 (若10s内未上传完毕,则hdfs创建一个新文件夹收集继续上传内容)
a1.sinks.llInterval = 10
#设置每个文件的滚动大小
a1.sinks.llSize = 134217728
#文件的滚动与Event数量无关
a1.sinks.llCount = 0
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.deC = lzop#Bind  
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1 #运行
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/f -n a1 -logger=INFO,console

f2脚本

#!/bin/bash
if [ $# -lt 1 ]
then
echo "请输入参数	 {start|stop}"
exit
fifor i in hadoop114
do
case $1 in 
start)
ssh $i "nohup flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/f -n a1 -logger=INFO,console 1>/opt/module/flume-1.9.0/logs/flume.log  2>&1 &"
;;
stop)
ssh $i " ps -ef | f | grep -v grep | awk '{print $2}' | xargs -n1 kill -9"
;;
*)
echo "参数错误,请输入参数	 {start|stop}"
exit
;;
esac
done

4 Kafka命令

4.1 单点启动

-- 启动
[atguigu@hadoop112 kafka_2.11-2.4.1]$ kafka-server-start.sh -daemon config/server.properties-- 停止
[atguigu@hadoop112 kafka_2.11-2.4.1]$ kafka-server-stop.sh

4.2 topic-命令行操作

1)查看所有的Topic
[atguigu@hadoop112 kafka_2.11-2.4.1]$kafka-topics.sh --list --bootstrap-server hadoop112:90922)创建Topic    topic 本次创建topic名称  partitions分区个数  replication-factor副本个数 3
[atguigu@hadoop112 kafka_2.11-2.4.1]$ kafka-topics.sh --create --bootstrap-server hadoop112:9092 --topic second --partitions 2 --replication-factor 33) 查看topic的详情
[atguigu@hadoop112 kafka_2.11-2.4.1]$ kafka-topics.sh --describe --bootstrap-server hadoop112:9092 --topic second4) 修改Topic 注意:只能增加分区不能减少分区
[atguigu@hadoop112 kafka_2.11-2.4.1]$ kafka-topics.sh --alter --bootstrap-server hadoop112:9092 --topic second --partitions 35) 删除Topic
[atguigu@hadoop112 kafka_2.11-2.4.1]$ kafka-topics.sh --delete --bootstrap-server hadoop112:9092 --topic third

注意
– 查看topic的详情时

Topic: second	PartitionCount: 2	ReplicationFactor: 3	Configs: segment.bytes=1073741824Topic: second	Partition: 0	Leader: 3	Replicas: 3,2,4	Isr: 3,2,4Topic: second	Partition: 1	Leader: 4	Replicas: 4,3,2	Isr: 4,3,2Topic: 名字	PartitionCount: 分区个数	ReplicationFactor:副本数	
Topic: 名字	Partition: 分区序号		Leader: L在那个broker	Replicas: R在那几个broker	
Isr:活着的R在那几个broker上 

4.3 命令行操作-生产和消费

1) 生产者
[atguigu@hadoop112 kafka_2.11-2.4.1]$ kafka-console-producer.sh --broker-list hadoop112:9092 -topic second2) 消费者(broker内分区无序,但分区内数据有序)
[atguigu@hadoop112 kafka_2.11-2.4.1]$ kafka-console-consumer.sh --bootstrap-server hadoop112:9092 -topic second
-- 新组从头开始读取数据	
[atguigu@hadoop112 kafka_2.11-2.4.1]$ kafka-console-consumer.sh --bootstrap-server hadoop112:9092 -topic second --from-beginning3)消费者组(offset 的 g[组]-t[主题]-p[分区])
-- 在config目录下修改配置文件consumer.properties内的group.id=g112,并在创建时候指定该配置文件
[atguigu@hadoop112 kafka_2.11-2.4.1]$ kafka-console-consumer.sh --bootstrap-server hadoop112:9092 -topic second --group g11

消费者注意

如果默认创建消费者:而不指定组,则默认分配组,每次默认创建的组都会不同kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic second
注意:若同一组的消费者数超过分区数,则会自动挤掉至和分区相同的消费者.

本文发布于:2024-01-29 03:36:36,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170647060112419.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:命令   hadoop   zk   kafka
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23