2024年2月6日发(作者:)
kafka安装 kafka安装前提条件:按照对应版本scala一、集群安装1.按照zookeeper安装手册部署zookeeper2.解压缩kafka_ 并修改名称为kafkatar -zxvf kafka_ -C /home/hadoop/kafka3.配置kafka的环境变量KAFKA_HOME、PATHexport KAFKA_HOME=/home/hadoop/kafkaexport PATH=$PATH:$KAFKA_HOME/bin4.修改 config/=1
(每台机器不一样且唯一)=true
(彻底删除topic
开启)=false
(是否容许控制器自动关闭节点)port==172.31.90.71(写每台机器自己IP地址:用IP地址形式)=/tmp/kafka-logs(修改目录)t=172.31.90.111:2181,172.31.90.71:2181,192.168.2.11:2181(用IP地址形式)=1000000
(连接超时等待时间)===ers==/tmp/kafka_metrics
(指定自己的目录)d==false
(慎用:取消自平衡调度器)5.将配置分发到其他机器上scp -r ~/kafka hadoop@slave1:~/scp -r ~/kafka hadoop@slave2:~/scp -r ~/kafka hadoop@slave3:~/6.启动Kafka(每一台)处理汉字乱码问题:命令行中加入参数: --property ng=UTF-8nohup ./kafka/config/ties &例子:启动:nohup /opt/kafka/config/ties &停止:nohup /opt/kafka/config/ties &nohup 关闭窗口不会断开session7.验证kafka创建主题bin/ --create --zookeeper hadoop1:2181 --replication-factor 3 --partitions 3 --topic test说明: --zookeeper:zookeeper地址:端口--replication-factor : 副本数量--partitions:分区数量--topic:主题名称列出所有topic: --list --zookeeper hadoop1:2181 查看主题详细
bin/ --describe --zookeeper hadoop1:2181 --topic test --topic testTopic:test PartitionCount:3 ReplicationFactor:3 Configs: Topic: test Partition: 0 Leader: 110 Replicas: 110,111,112 Isr: 110,111,112 Topic: test Partition: 1 Leader: 111 Replicas: 111,112,110 Isr: 111,112,110 Topic: test Partition: 2 Leader: 112 Replicas: 112,110,111 Isr: 112,110,111说明:{partiton:分区:IDleader:当前负责读写的lead broker idkrelicas:当前partition的所有replication broker listisr:relicas的子集,只包含出于活动状态的broker}去zk上看kafka集群[zk: localhost:2181(CONNECTED) 5] ls /[admin, zookeeper, consumers, config, controller, zk-fifo, storm, brokers, controller_epoch][zk: localhost:2181(CONNECTED) 6] ls /brokers ---->
查看注册在zk内的kafka[topics, ids][zk: localhost:2181(CONNECTED) 7] ls /brokers/ids[112, 110, 111][zk: localhost:2181(CONNECTED) 8] ls /brokers/ids/112[][zk: localhost:2181(CONNECTED) 9] ls /brokers/topics
[test][zk: localhost:2181(CONNECTED) 10] ls /brokers/topics/test
[partitions][zk: localhost:2181(CONNECTED) 11] ls /brokers/topics/test/partitions[2, 1, 0][zk: localhost:2181(CONNECTED) 12]
8、关闭kafka
pkill -9 -f ti二、kafka java调用:1、 java端生产数据, kafka集群消费数据:Java代码
1. 1
创建maven工程,中增加如下:
2.
3.
4.
5.
6.
7.
8.
9. 2 java代码:
向主题test内写入数据
10.
11. import ties;
12. import it;
13.
14. import er;
15. import essage;
16. import erConfig;
17. import Encoder;
18.
19.
20.
21.
22. public class kafkaProducer extends Thread{
23.
24. private String topic;
25.
26. public kafkaProducer(String topic){
27. super();
28. = topic;
29. }
30.
31.
32. @Override
33. public void run() {
34. Producer producer = createProducer();
35. int i=0;
36. while(true){
37. (new KeyedMessage
38. try {
39. (1);
40. } catch (InterruptedException e) {
41. tackTrace();
42. }
43. }
44. }
45.
46. private Producer createProducer() {
47. Properties properties = new Properties();
48. ("t", "192.168.1.110:2181,192.168.1.111:2181,192.168.1.112:2181");//声明zk
49. ("", "Encoder");
50. ("", "192.168.1.110:9092,192.168.1.111:9093,192.168.1.112:9094");//
声明kafka broker
51. return new Producer
52. }
53.
54.
55. public static void main(String[] args) {
56. new kafkaProducer("test").start();//
使用kafka集群中创建好的主题 test
57.
58. }
59.
60. }
61.
62.
63.
64.
65. 3 kafka集群中消费主题test的数据:
66. [root@h2master kafka]# bin/ --zookeeper localhost:2181 --topic test --from-beginnin
67.
68. 4
启动java代码,然后在看集群消费的数据如下:
69.
70. message: 0
71. message: 1
72. message: 2
73. message: 3
74. message: 4
75. message: 5
76. message: 6
77. message: 7
78. message: 8
79. message: 9
80. message: 10
81. message: 11
82. message: 12
83. message: 13
84. message: 14
85. message: 15
86. message: 16
87. message: 17
88. message: 18
89. message: 19
90. message: 20
91. message: 21
2、kafka
使用Java写消费者,这样
先运行kafkaProducer
,在运行kafkaConsumer,即可得到生产者的数据:Java代码
1. import p;
2. import ;
3. import ;
4. import ties;
5.
6. import er;
7. import erConfig;
8. import erIterator;
9. import tream;
10. import erConnector;
11.
12.
13.
14.
15. /**
16. *
接收数据
17. *
接收到: message: 10
18.
接收到: message: 11
19.
接收到: message: 12
20.
接收到: message: 13
21.
接收到: message: 14
22. * @author zm
23. *
24. */
25. public class kafkaConsumer extends Thread{
26.
27. private String topic;
28.
29. public kafkaConsumer(String topic){
30. super();
31. = topic;
32. }
33.
34.
35. @Override
36. public void run() {
37. ConsumerConnector consumer = createConsumer();
38. Map
39. (topic, 1); //
一次从主题中获取一个数据
40. Map
MessageStreams(topicCountMap);
41. KafkaStream
获取每次接收到的这个数据
42. ConsumerIterator
43. while(t()){
44. String message = new String(().message());
45. n("接收到: " + message);
46. }
47. }
48.
49. private ConsumerConnector createConsumer() {
50. Properties properties = new Properties();
51. ("t", "192.168.1.110:2181,192.168.1.111:2181,192.168.1.112:2181");//声明zk
52. ("", "group1");//
必须要使用别的组名称,
如果生产者和消费者都在同一组,则不能访问同一组内的topic数据
53. return JavaConsumerConnector(new ConsumerConfig(properties));
54. }
55.
56.
57. public static void main(String[] args) {
58. new kafkaConsumer("test").start();//
使用kafka集群中创建好的主题 test
59.
60. }
61.
62. }
本文发布于:2024-02-06 22:05:46,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170722834662564.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |