kafka安装

阅读: 评论:0

2024年2月6日发(作者:)

kafka安装

kafka安装 kafka安装前提条件:按照对应版本scala一、集群安装1.按照zookeeper安装手册部署zookeeper2.解压缩kafka_ 并修改名称为kafkatar -zxvf kafka_ -C /home/hadoop/kafka3.配置kafka的环境变量KAFKA_HOME、PATHexport KAFKA_HOME=/home/hadoop/kafkaexport PATH=$PATH:$KAFKA_HOME/bin4.修改 config/=1

(每台机器不一样且唯一)=true

(彻底删除topic

开启)=false

(是否容许控制器自动关闭节点)port==172.31.90.71(写每台机器自己IP地址:用IP地址形式)=/tmp/kafka-logs(修改目录)t=172.31.90.111:2181,172.31.90.71:2181,192.168.2.11:2181(用IP地址形式)=1000000

(连接超时等待时间)===ers==/tmp/kafka_metrics

(指定自己的目录)d==false

(慎用:取消自平衡调度器)5.将配置分发到其他机器上scp -r ~/kafka hadoop@slave1:~/scp -r ~/kafka hadoop@slave2:~/scp -r ~/kafka hadoop@slave3:~/6.启动Kafka(每一台)处理汉字乱码问题:命令行中加入参数: --property ng=UTF-8nohup ./kafka/config/ties &例子:启动:nohup /opt/kafka/config/ties &停止:nohup /opt/kafka/config/ties &nohup 关闭窗口不会断开session7.验证kafka创建主题bin/ --create --zookeeper hadoop1:2181 --replication-factor 3 --partitions 3 --topic test说明: --zookeeper:zookeeper地址:端口--replication-factor : 副本数量--partitions:分区数量--topic:主题名称列出所有topic: --list --zookeeper hadoop1:2181 查看主题详细

bin/ --describe --zookeeper hadoop1:2181 --topic test --topic testTopic:test PartitionCount:3 ReplicationFactor:3 Configs: Topic: test Partition: 0 Leader: 110 Replicas: 110,111,112 Isr: 110,111,112 Topic: test Partition: 1 Leader: 111 Replicas: 111,112,110 Isr: 111,112,110 Topic: test Partition: 2 Leader: 112 Replicas: 112,110,111 Isr: 112,110,111说明:{partiton:分区:IDleader:当前负责读写的lead broker idkrelicas:当前partition的所有replication broker listisr:relicas的子集,只包含出于活动状态的broker}去zk上看kafka集群[zk: localhost:2181(CONNECTED) 5] ls /[admin, zookeeper, consumers, config, controller, zk-fifo, storm, brokers, controller_epoch][zk: localhost:2181(CONNECTED) 6] ls /brokers ---->

查看注册在zk内的kafka[topics, ids][zk: localhost:2181(CONNECTED) 7] ls /brokers/ids[112, 110, 111][zk: localhost:2181(CONNECTED) 8] ls /brokers/ids/112[][zk: localhost:2181(CONNECTED) 9] ls /brokers/topics

[test][zk: localhost:2181(CONNECTED) 10] ls /brokers/topics/test

[partitions][zk: localhost:2181(CONNECTED) 11] ls /brokers/topics/test/partitions[2, 1, 0][zk: localhost:2181(CONNECTED) 12]

8、关闭kafka

pkill -9 -f ti二、kafka java调用:1、 java端生产数据, kafka集群消费数据:Java代码

1. 1

创建maven工程,中增加如下:

2.

3.

4. kafka_2.10

5. 0.8.2.0

6.

7.

8.

9. 2 java代码:

向主题test内写入数据

10.

11. import ties;

12. import it;

13.

14. import er;

15. import essage;

16. import erConfig;

17. import Encoder;

18.

19.

20.

21.

22. public class kafkaProducer extends Thread{

23.

24. private String topic;

25.

26. public kafkaProducer(String topic){

27. super();

28. = topic;

29. }

30.

31.

32. @Override

33. public void run() {

34. Producer producer = createProducer();

35. int i=0;

36. while(true){

37. (new KeyedMessage(topic, "message: " + i++));

38. try {

39. (1);

40. } catch (InterruptedException e) {

41. tackTrace();

42. }

43. }

44. }

45.

46. private Producer createProducer() {

47. Properties properties = new Properties();

48. ("t", "192.168.1.110:2181,192.168.1.111:2181,192.168.1.112:2181");//声明zk

49. ("", "Encoder");

50. ("", "192.168.1.110:9092,192.168.1.111:9093,192.168.1.112:9094");//

声明kafka broker

51. return new Producer(new ProducerConfig(properties));

52. }

53.

54.

55. public static void main(String[] args) {

56. new kafkaProducer("test").start();//

使用kafka集群中创建好的主题 test

57.

58. }

59.

60. }

61.

62.

63.

64.

65. 3 kafka集群中消费主题test的数据:

66. [root@h2master kafka]# bin/ --zookeeper localhost:2181 --topic test --from-beginnin

67.

68. 4

启动java代码,然后在看集群消费的数据如下:

69.

70. message: 0

71. message: 1

72. message: 2

73. message: 3

74. message: 4

75. message: 5

76. message: 6

77. message: 7

78. message: 8

79. message: 9

80. message: 10

81. message: 11

82. message: 12

83. message: 13

84. message: 14

85. message: 15

86. message: 16

87. message: 17

88. message: 18

89. message: 19

90. message: 20

91. message: 21

2、kafka

使用Java写消费者,这样

先运行kafkaProducer

,在运行kafkaConsumer,即可得到生产者的数据:Java代码

1. import p;

2. import ;

3. import ;

4. import ties;

5.

6. import er;

7. import erConfig;

8. import erIterator;

9. import tream;

10. import erConnector;

11.

12.

13.

14.

15. /**

16. *

接收数据

17. *

接收到: message: 10

18.

接收到: message: 11

19.

接收到: message: 12

20.

接收到: message: 13

21.

接收到: message: 14

22. * @author zm

23. *

24. */

25. public class kafkaConsumer extends Thread{

26.

27. private String topic;

28.

29. public kafkaConsumer(String topic){

30. super();

31. = topic;

32. }

33.

34.

35. @Override

36. public void run() {

37. ConsumerConnector consumer = createConsumer();

38. Map topicCountMap = new HashMap();

39. (topic, 1); //

一次从主题中获取一个数据

40. Map>> messageStreams =

MessageStreams(topicCountMap);

41. KafkaStream stream = (topic).get(0);//

获取每次接收到的这个数据

42. ConsumerIterator iterator = or();

43. while(t()){

44. String message = new String(().message());

45. n("接收到: " + message);

46. }

47. }

48.

49. private ConsumerConnector createConsumer() {

50. Properties properties = new Properties();

51. ("t", "192.168.1.110:2181,192.168.1.111:2181,192.168.1.112:2181");//声明zk

52. ("", "group1");//

必须要使用别的组名称,

如果生产者和消费者都在同一组,则不能访问同一组内的topic数据

53. return JavaConsumerConnector(new ConsumerConfig(properties));

54. }

55.

56.

57. public static void main(String[] args) {

58. new kafkaConsumer("test").start();//

使用kafka集群中创建好的主题 test

59.

60. }

61.

62. }

kafka安装

本文发布于:2024-02-06 22:05:46,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170722834662564.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:数据   主题   集群
留言与评论(共有 0 条评论)
   
验证码:
排行榜

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23