借助Docker学习大数据:Flink

阅读: 评论:0

借助Docker学习大数据:Flink

借助Docker学习大数据:Flink

借助Docker学习大数据:Flink

注意:本博文基于WSL2 & Docker,如无法达到实验效果,请注意环境差异。如果你使用的是Ubuntu、虚拟机等方式,注意下文提到的hostname

WSL2安装Docker:

本文注重实验,原理可参考:

一、Flink集群搭建

1.1 镜像下载

首先下载Flink镜像

docker pull flink	# 获取镜像
docker images		# 查看下载的镜像

1.2 集群搭建

我们可以直接运行 JobManager or TaskManager

方式分别如下:

docker run --name flink_jobmanager -d -t flink jobmanager	# JobManager
docker run --name flink_taskmanager -d -t flink taskmanager	# TaskManager

我们这里直接通过Docker Compose的方式运行一个集群:

docker compose 介绍

首先新建一个文件夹用于存放yml文件。这里我在WSL2的home路径新建一个 docker-flink 文件夹,并在该文件夹中新建 文件,内容如下:

version: "2.1"
services:jobmanager:image: ${FLINK_DOCKER_IMAGE_NAME:-flink}expose:- "6123"ports:- "8081:8081"command: jobmanagerenvironment:- JOB_MANAGER_RPC_ADDRESS=jobmanagertaskmanager:image: ${FLINK_DOCKER_IMAGE_NAME:-flink}expose:- "6121"- "6122"depends_on:- jobmanagercommand: taskmanagerlinks:- "jobmanager:jobmanager"environment:- JOB_MANAGER_RPC_ADDRESS=jobmanager

比如我的文件如下:

version: "2.1"
services:jobmanager:image: flinkexpose:- "6123"ports:- "8081:8081"command: jobmanagerenvironment:- JOB_MANAGER_RPC_ADDRESS=jobmanagertaskmanager:image: flinkexpose:- "6121"- "6122"depends_on:- jobmanagercommand: taskmanagerlinks:- "jobmanager:jobmanager"environment:- JOB_MANAGER_RPC_ADDRESS=jobmanager

创建完成,直接在该目录运行如下命令启动docker compose 即可:

docker-compose up -d

使用浏览器打开 localhost:8081 即可:


当然我们也可以直接在docker dashboard看到集群的信息,发现是 1 JobManager 1 TaskManager,如果想要扩展可以通过如下命令:

docker-compose scale taskmanager=<N>

二、Java 编程 WordCount

2.1 Maven

    <properties><flink.version>1.11.1</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><!-- .apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency></dependencies>

2.2 Java Coding

import org.apache.flink.apimon.functions.FlatMapFunction;
import org.apache.flink.apimon.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.vironment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/*** @author Creek*/
@SuppressWarnings("serial")
public class WordCount {public static void main(String[] args) throws Exception {// the host and the port to connect tofinal String hostname;final int port;try {final ParameterTool params = ParameterTool.fromArgs(args);// 注意:WSL2 的 hostname 不是localhost,可以在WSL2中输入ifconfig获得
//            hostname = params.has("hostname") ? ("hostname") : "localhost";hostname = params.has("hostname") ? ("hostname") : "172.31.61.151";port = Int("port");} catch (Exception e) {println("No port specified. Please run 'SocketWindowWordCount " +"--hostname <hostname> --port <port>', where hostname (localhost by default) " +"and port is the address of the text server");println("To start a simple text server, run 'netcat -l <port>' and " +"type the input text into the command line");return;}// get the execution environmentfinal StreamExecutionEnvironment env = ExecutionEnvironment();// get input data by connecting to the socketDataStream<String> text = env.socketTextStream(hostname, port, "n");// parse the data, group it, window it, and aggregate the countsDataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {@Overridepublic void flatMap(String value, Collector<WordWithCount> out) {for (String word : value.split("\s")) {llect(new WordWithCount(word, 1L));}}}).keyBy("word").timeWindow(Time.seconds(5)).reduce(new ReduceFunction<WordWithCount>() {@Overridepublic WordWithCount reduce(WordWithCount a, WordWithCount b) {return new WordWithCount(a.word, a.count + b.count);}});// print the results with a single thread, rather than in parallelwindowCounts.print().setParallelism(1);ute("Socket Window WordCount");}// ------------------------------------------------------------------------/*** Data type for words with count.*/public static class WordWithCount {public String word;public long count;public WordWithCount() {}public WordWithCount(String word, long count) {this.word = unt = count;}@Overridepublic String toString() {return word + " : " + count;}}
}

这里我们配置args如下:

2.3 监听9000端口

在WSL2中

netcat -l 9000	# nc -l 9000

运行main函数,在终端随便输入几个单词:

效果如下:

需要指出的是:现在你看到终端有结果,并不是借助flink运行的,如果你把docker运行的flink的集群关掉,仍然可以看到结果。

下面我们把代码打包,放入集群中运行查看效果。

三、打Jar包,提交集群

使用IDEA打包,找到jar包,右键打开文件位置

在flink中提交

填写主类信息、参数信息,submit


发现正在运行:


参考资料:
.java

.html

本文发布于:2024-02-02 07:47:50,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170683126942376.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:数据   Docker   Flink
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23