大数据学习指南
  • README
  • Storm
    • Storm和流处理简介
    • Storm集成Kakfa
    • Storm集群环境搭建
    • Storm编程模型详解
    • Storm核心概念详解
    • Storm三种打包方式对比分析
    • Storm集成Redis详解
    • Storm集成HBase和HDFS
    • Storm单机环境搭建
  • HBase
    • HBase过滤器详解
    • HBase的 SQL 中间层Phoenix
    • HBase常用 Shell 命令
    • HBase系统架构及数据结构
    • HBase集群环境搭建
    • HBase容灾与备份
    • HBase Java API
    • HBase协处理器详解
    • Spring Boot 整合 Mybatis + Phoenix
    • HBase简介
    • HBase单机环境搭建
  • Flink
    • Flink 窗口模型
    • Flink 状态管理与检查点机制
    • Flink核心概念综述
    • Flink开发环境搭建
    • Flink Sink
    • Flink Data Source
    • Flink 中使用 RocksDB 状态后端
    • Flink Transformation
    • Flink Standalone 集群部署
  • Spark
    • Spark SQL
      • Spark SQL JOIN操作
      • DataFrame和Dataset简介
      • Spark SQL 常用聚合函数
      • Structured API基本使用
      • Spark SQL 外部数据源
    • Spark Streaming
      • Spark Streaming 基本操作
      • Spark Streaming 整合 Flume
      • Spark Streaming 整合 Kafka
      • Spark Streaming 简介
    • Spark Core
      • Transformation 和 Action 常用算子
      • Spark累加器与广播变量
      • 基于ZooKeeper搭建Spark高可用集群
      • Spark运行模式与作业提交
      • Spark开发环境搭建
      • 弹性式数据集RDD
      • Spark简介
  • Scala
    • 类和对象
    • 集合类型
    • 隐式转换和隐式参数
    • 流程控制语句
    • 继承和特质
    • 函数 & 闭包 & 柯里化
    • Scala数组
    • Scala基本数据类型和运算符
    • 模式匹配
    • Scala List & Set
    • Scala简介及开发环境配置
    • 类型参数
    • Scala Map & Tuple
  • Hive
    • Hive实现WordCount详解
    • Hive常用DDL操作
    • Hive视图和索引
    • Linux环境下Hive的安装部署
    • HiveCLI和Beeline命令行的基本使用
    • Hive常用DML操作
    • Hive分区表和分桶表
    • Hive简介及核心概念
    • Hive数据查询详解
    • Hive SQL的编译过程
  • Hadoop
    • 分布式计算框架—MapReduce
    • HDFS Java API 的使用
    • Hadoop单机环境搭建
    • HDFS常用Shell命令
    • Hadoop极简入门
    • MapReduce编程模型和计算框架架构原理
    • 基于Zookeeper搭建Hadoop高可用集群
    • Hadoop集群环境搭建
    • 集群资源管理器—YARN
    • Hadoop分布式文件系统—HDFS
  • 前言
    • 大数据框架对比:Hadoop、Storm、Samza、Spark和Flink
由 GitBook 提供支持
在本页
  • 1. 部署模式
  • 2. 单机模式
  • 2.1 安装部署
  • 2.2 作业提交
  • 2.3 停止作业
  • 2.4 停止 Flink
  • 3. Standalone Cluster
  • 3.1 前置条件
  • 3.2 搭建步骤
  • 3.3 可选配置
  • 4. Standalone Cluster HA
  • 4.1 前置条件
  • 4.2 搭建步骤
  • 4.3 常见异常
  • 5. 参考资料

这有帮助吗?

  1. Flink

Flink Standalone 集群部署

上一页Flink Transformation下一页Spark

最后更新于4年前

这有帮助吗?

转载:

1. 部署模式

Flink 支持使用多种部署模式来满足不同规模应用的需求,常见的有单机模式,Standalone Cluster 模式,同时 Flink 也支持部署在其他第三方平台上,如 YARN,Mesos,Docker,Kubernetes 等。以下主要介绍其单机模式和 Standalone Cluster 模式的部署。

2. 单机模式

单机模式是一种开箱即用的模式,可以在单台服务器上运行,适用于日常的开发和调试。具体操作步骤如下:

2.1 安装部署

1. 前置条件

Flink 的运行依赖 JAVA 环境,故需要预先安装好 JDK,具体步骤可以参考:

2. 下载 & 解压 & 运行

Flink 所有版本的安装包可以直接从其进行下载,这里我下载的 Flink 的版本为 1.9.1 ,要求的 JDK 版本为 1.8.x +。 下载后解压到指定目录:

tar -zxvf flink-1.9.1-bin-scala_2.12.tgz  -C /usr/app

不需要进行任何配置,直接使用以下命令就可以启动单机版本的 Flink:

bin/start-cluster.sh

3. WEB UI 界面

Flink 提供了 WEB 界面用于直观的管理 Flink 集群,访问端口为 8081:

Flink 的 WEB UI 界面支持大多数常用功能,如提交作业,取消作业,查看各个节点运行情况,查看作业执行情况等,大家可以在部署完成后,进入该页面进行详细的浏览。

2.2 作业提交

启动后可以运行安装包中自带的词频统计案例,具体步骤如下:

1. 开启端口

nc -lk 9999

2. 提交作业

bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999

3. 输入测试数据

a a b b c c c a e

4. 查看控制台输出

可以通过 WEB UI 的控制台查看作业统运行情况:

也可以通过 WEB 控制台查看到统计结果:

2.3 停止作业

可以直接在 WEB 界面上点击对应作业的 Cancel Job 按钮进行取消,也可以使用命令行进行取消。使用命令行进行取消时,需要先获取到作业的 JobId,可以使用 flink list 命令查看,输出如下:

[root@hadoop001 flink-1.9.1]# ./bin/flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
05.11.2019 08:19:53 : ba2b1cc41a5e241c32d574c93de8a2bc : Socket Window WordCount (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

获取到 JobId 后,就可以使用 flink cancel 命令取消作业:

bin/flink cancel ba2b1cc41a5e241c32d574c93de8a2bc

2.4 停止 Flink

命令如下:

bin/stop-cluster.sh

3. Standalone Cluster

Standalone Cluster 模式是 Flink 自带的一种集群模式,具体配置步骤如下:

3.1 前置条件

使用该模式前,需要确保所有服务器间都已经配置好 SSH 免密登录服务。这里我以三台服务器为例,主机名分别为 hadoop001,hadoop002,hadoop003 , 其中 hadoop001 为 master 节点,其余两台为 slave 节点,搭建步骤如下:

3.2 搭建步骤

修改 conf/flink-conf.yaml 中 jobmanager 节点的通讯地址为 hadoop001:

jobmanager.rpc.address: hadoop001

修改 conf/slaves 配置文件,将 hadoop002 和 hadoop003 配置为 slave 节点:

hadoop002
hadoop003

将配置好的 Flink 安装包分发到其他两台服务器上:

 scp -r /usr/app/flink-1.9.1 hadoop002:/usr/app
 scp -r /usr/app/flink-1.9.1 hadoop003:/usr/app

在 hadoop001 上使用和单机模式相同的命令来启动集群:

bin/start-cluster.sh

此时控制台输出如下:

启动完成后可以使用 Jps 命令或者通过 WEB 界面来查看是否启动成功。

3.3 可选配置

除了上面介绍的 jobmanager.rpc.address 是必选配置外,Flink h还支持使用其他可选参数来优化集群性能,主要如下:

  • jobmanager.heap.size:JobManager 的 JVM 堆内存大小,默认为 1024m 。

  • taskmanager.heap.size:Taskmanager 的 JVM 堆内存大小,默认为 1024m 。

  • taskmanager.numberOfTaskSlots:Taskmanager 上 slots 的数量,通常设置为 CPU 核心的数量,或其一半。

  • parallelism.default:任务默认的并行度。

  • io.tmp.dirs:存储临时文件的路径,如果没有配置,则默认采用服务器的临时目录,如 LInux 的 /tmp 目录。

4. Standalone Cluster HA

上面我们配置的 Standalone 集群实际上只有一个 JobManager,此时是存在单点故障的,所以官方提供了 Standalone Cluster HA 模式来实现集群高可用。

4.1 前置条件

在 Standalone Cluster HA 模式下,集群可以由多个 JobManager,但只有一个处于 active 状态,其余的则处于备用状态,Flink 使用 ZooKeeper 来选举出 Active JobManager,并依赖其来提供一致性协调服务,所以需要预先安装 ZooKeeper 。

另外在高可用模式下,还需要使用分布式文件系统来持久化存储 JobManager 的元数据,最常用的就是 HDFS,所以 Hadoop 也需要预先安装。关于 Hadoop 集群和 ZooKeeper 集群的搭建可以参考:

4.2 搭建步骤

修改 conf/flink-conf.yaml 文件,增加如下配置:

# 配置使用zookeeper来开启高可用模式
high-availability: zookeeper
# 配置zookeeper的地址,采用zookeeper集群时,可以使用逗号来分隔多个节点地址
high-availability.zookeeper.quorum: hadoop003:2181
# 在zookeeper上存储flink集群元信息的路径
high-availability.zookeeper.path.root: /flink
# 集群id
high-availability.cluster-id: /standalone_cluster_one
# 持久化存储JobManager元数据的地址,zookeeper上存储的只是指向该元数据的指针信息
high-availability.storageDir: hdfs://hadoop001:8020/flink/recovery

修改 conf/masters 文件,将 hadoop001 和 hadoop002 都配置为 master 节点:

hadoop001:8081
hadoop002:8081

确保 Hadoop 和 ZooKeeper 已经启动后,使用以下命令来启动集群:

bin/start-cluster.sh

此时输出如下:

可以看到集群已经以 HA 的模式启动,此时还需要在各个节点上使用 jps 命令来查看进程是否启动成功,正常情况如下:

只有 hadoop001 和 hadoop002 的 JobManager 进程,hadoop002 和 hadoop003 上的 TaskManager 进程都已经完全启动,才表示 Standalone Cluster HA 模式搭建成功。

4.3 常见异常

如果进程没有启动,可以通过查看 log 目录下的日志来定位错误,常见的一个错误如下:

2019-11-05 09:18:35,877 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint      
- Shutting StandaloneSessionClusterEntrypoint down with application status FAILED. Diagnostics
java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)
.......
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file 
system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no 
Hadoop file system to support this scheme could be loaded.
.....
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in 
the classpath/dependencies.
......

下载完成后,将该 JAR 包上传至所有 Flink 安装目录的 lib 目录即可。

5. 参考资料

2020-10-25-KB52oS

该 JAR 包的源码可以在 Flink 官方的 GitHub 仓库中找到,地址为 : ,可选传参有 hostname, port,对应的词频数据需要使用空格进行分割。

2020-10-25-5DoJfd
2020-10-25-71ENir
2020-10-25-5Bqv4x

更多配置可以参考 Flink 的官方手册:

2020-10-25-SbIvK2
2020-10-25-03WvVi

可以看到是因为在 classpath 目录下找不到 Hadoop 的相关依赖,此时需要检查是否在环境变量中配置了 Hadoop 的安装路径,如果路径已经配置但仍然存在上面的问题,可以从 下载对应版本的 Hadoop 组件包:

2020-10-25-VaU7lt

Flink Standalone 集群部署
Linux 环境下 JDK 安装
官网
SocketWindowWordCount
Configuration
Hadoop 集群环境搭建
Zookeeper 单机环境和集群环境搭建
Flink 官网
Standalone Cluster
JobManager High Availability (HA)