flink安装
Flink的本地 模式启动
./bin/start-local.sh
Flink的standalone 模式简单部署
flink-conf.yaml 文件中进行一下配置:
jobmanager.rpc.address: 10.8.45.10
# The RPC port where the JobManager is reachable.
jobmanager.rpc.port: 6123
# The heap size for the JobManager JVM
jobmanager.heap.mb: 4096
# The heap size for the TaskManager JVM
taskmanager.heap.mb: 4096
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 4
# Specify whether TaskManager memory should be allocated when starting up (true) or when
# memory is required in the memory manager (false)
taskmanager.memory.preallocate: false
# The parallelism used for programs that did not specify and other parallelism.
parallelism.default: 4
#==============================================================================
# Web Frontend
#==============================================================================
# The address under which the web-based runtime monitor listens.
#
#jobmanager.web.address: 0.0.0.0
# The port under which the web-based runtime monitor listens.
# A value of -1 deactivates the web server.
jobmanager.web.port: 8081
启动:
在bin目录下执行 : ./start-cluster.sh 即可
总体来看,基本的集群操作配置与 hadoop spark 等分布式平台部署没啥区别
Flink on yarn 模式简单部署
配置环境变量,把hadoop配置添加进去
export HADOOP_CONF_DIR=/etc/hadoop/conf
参数配置
高可用配置:
如果运行seesion的机器或者nodemanager进程down掉的话会自动切换到别的资源上运行
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.storageDir: hdfs:///flink/recovery
high-availability.zookeeper.path.root: /flink
yarn.application-attempts: 10
配置kerberos:
security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: /etc/kafka/kafka.keytab
security.kerberos.login.principal: [email protected]
security.kerberos.login.contexts: Client,KafkaClient"
启动Flink Yarn Session
cd flink
bin/yarn-session.sh -n 4 -jm 1024 -tm 4096 -s 32
指定资源池flink,用户dmp_flink
./bin/yarn-session.sh -n 4 -jm 2048 -tm 8190 -s 8 -nm dmp_flink -qu flink
对参数说明:
-n,–container 分配多少个yarn容器 (=taskmanager的数量)
-D 动态属性
-d,–detached 独立运行
-jm,–jobManagerMemory JobManager的内存 [in MB]
-nm,–name 在YARN上为一个自定义的应用设置一个名字
-q,–query 显示yarn中可用的资源 (内存, cpu核数)
-qu,–queue 指定YARN队列.
-s,–slots 每个TaskManager使用的slots数量
-tm,–taskManagerMemory 每个TaskManager的内存 [in MB]
-z,–zookeeperNamespace 针对HA模式在zookeeper上创建NameSpace
执行上面命令来分配 4个 TaskManager,每个都拥有 4GB 的内存和 32 个 slot,同时会请求启动 5 个容器,因为对于 ApplicationMaster 和 JobManager 还需要一个额外的容器。
注:
A. Flink的JobManager和TaskManager的内存大小不要小于YARNContainer的最小值(yarn.scheduler.minimum-allocation-mb,默认值为1024MB)。
B. 请注意客户端需要提前设置环境变量 YARN_CONF_DIR 或 HADOOP_CONF_DIR,用来读取 YARN 和 HDFS 配置。
flink on yarn ,只需在 yarn 的gateway机器上安装一个flink , 这个flink 当做flink client 机器 (也即flink 代码提交的入口) ----->启动yarn-session,并设置参数------> 提交任务 flink run 。。。。