初认识
特点:高水平扩展、高吞吐。消息中间件。不支持事务(AMQ可支持)。支持动态扩容(通过zookeeper实现)。
协议:AMQP协议
结构:producer--broker--cosumer
topic
partition:一个topic中的消息数据按照多个分区组织,分区时kafka消息队列组织的最小单位,一个分区可以看作是一个FIFO的队列
Broker1 Broker2 Broker3
partition-0 partition-1 partition-2
partition-1 partition-2 partition-0
如上,即3个partition,存了两份。并且,对每个分区进行备份(Replication)。把每个分区上面的数据放到不同的broker上,防止某个broker 宕机,造成分区数据不可用。
越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力
一、搭建zookeeper集群
数量:一般为3+,且奇数,因为zk可实现接近半数服务器异常的情况下,依然提供服务,即3台里面可以允许1台宕机。
环境:JDK 1.7,zookeeper需要运行在JVM上面。版本3.4.6
1、安装JDK
解压,配置系统环境变量JAVA_HOME到jdk所在目录/bin。
java -version
2、安装zk
解压,创建两个目录zkdata、zkdatalog。
conf目录下有sample,cp zoo_sample.cfg zoo.cfg[一定的]。其中,dataDir是快照日志【我理解就是比如节点的信息】的存储路径,修改为之前创建的zkdata。新增一个配置想dataLogDir是zk的事务日志存放目录。[如果不另配置,将都放在dataDir中]。端口。集群配置:server.1[1是本台机器的标示]=IP 2888[默认是2888,是master和slave之间通信的端口] 3888[leader选举的端口],再有server.2=IP 2888 3888,server.3=IP 2888 3888.
进入dataDir所在的目录,创建myid的文件,内容为1[即上文上提到的本机标示1]
clientport 是client来连接本机的端口,集群中的此端口不必保持一致
ticktime是zk中的时间单元
3、启动zk
bin目录下,./zkServer.sh start。分别在2台机器上面执行
查看集群状态
./zkServer.sh status
可以看到mode:foollower或者是leader。一般只有一个leader,用于响应client端的读写请求。slaver从leader同步数据。leader异常时,会通过投票,在slaver中选一个当leader
另,
log4j文件,日志输出格式的文件。zk不会自主清除日志文件,需定期清理。
zoo.cfg:tickTime,指的是initLimit和syncLimit的时间单位(ms)。initLimit是集群启动的时候,达成一致状态的时间。若到这个时间还没启动好,就回报失败。syncLimit是master给slave发心跳并回给master,这之间的往返最大时间。若超过此时间,master就认为此slave已经死机。
查看:
./zkCli.sh
ls / 一定要加/才可以
出来zk目录外,其他均为kfka创建的
get *
二、搭建kafka集群
数量一般大于等于2,已经搭建好的zk集群。
1、安装
解压,创建目录kafkaLogs,用于存放kafka的消息。
2、配置
config/
server.properties 最重要
zookeeprt.properties kafka自带的zk,kafka也可以使用自带的zk来启动,但不推荐这样的方式
在server.properties中:
broker.id=0第一台
port
host.name=IP,考虑到DNS解析是有失败率的,会泄漏文件句柄
num.network.thread= broker进行网络处理的线程数,一般不修改
num.io.thread broker进行IO处理的线程数,需大于log.dirs的个数,以保证一个线程处理一个目录
socket.send.buffer.bytes kafka发送消息的缓冲区大小
socket.receive.buffer.bytes 接收
socket.request.max.bytes 发送或接收消息的请求的最大数。不能超过java的堆栈大小
log.dirs= 队列中的消息,持久化的地方,即上面创建的kafkaLogs。可配制多个,以逗号分割,新topic会放到最少的那个里面
num,partition 一个topic默认的分区数
log.retention.hours kafka上面的消息的失效期
message.max.byte=5048576 kafka可接收的每条消息的最大大小
default.relication.factor=2 kafka集群保存消息的副本数,默认1
replication.fetch.max.bytes=5M 取消息的最大连接数
log.segment.bytes 由于文件被一直追加写,知道达到这个大小的时候,会新起一个文件。即持久化的文件的最大大小
log.retention.check.interval.ms 每个这么多时间,扫描下是否到失效时间
zookeeper.connect=本机IP:port,另一台IP:port,再另一台IP:port
zookeeper.conection kafka连接zk的超时时间。
3、启动
bin目录下 ./kafka-server-start.sh -deamon[后台启动]../config/server.properties[指定配置文件]
jps看进程在不在
日志:
server.log kafka的运行日志
state-change.log 切换日志。leader切换
controller.log crotroller的信息
其他配置文件:
1——consumer
zk.connet=一般会使用server中的,这里不用管
zk,connnection.timeout.ms
group.id 组织一个topic下面的多个partition。假设一个topic有两个group id,那么一个group id对应一个consumer组。不同的consumer组可以复制消息这个topic消息。即此一个topic消息可以被两个程序拿两遍
2——producer
broker.list 已在server中配置
producer.type 消息发送方式,默认为同步sync。可以在程序中改动,一般不改这里
compression.codec 压缩的手段
serializer.class=kafka.serializer.DefaultEncoder
offset
不同于AMQ,kafka在消息被消费之后,仍会根据broker的设置,保留一段时间。到了时间后,无论消息是否被消费,都会被删除。
每条消息在文件中的位置称为offset(偏移量),(上面的图)offset为一个long型数字,它唯一的标记一条消息【即,消息是靠偏移量来确认的】。kafka并没有提供其他额外的索引机制来存储offset,因为在kafka中几乎不允许对消息进行“随机读写”。
对于consumer而言,它负责保存偏移量。当consumer正常消费消息时,offset将会”线性”的向前驱动,【我理解类似游标】,即消息将依次顺序被消费.事实上consumer可以使用任意顺序消费消息,它只需要将offset重置为任意值..(offset将会保存在zookeeper中,参见下文)kafka集群几乎不需要维护任何consumer和producer状态信息,这些信息有zookeeper保存;因此producer和consumer的客户端实现非常轻量级,它们可以随意离开,而不会对集群造成额外的影响.
基于replicated方案,那么就意味着需要对多个备份进行调度;每个partition都有一个server为”leader”;leader负责所有的读写操作,如果leader失效,那么将会有其他follower来接管(成为新的leader);follower只是单调的和leader跟进,同步消息即可..由此可见作为leader的server承载了全部的请求压力,因此从集群的整体考虑,有多少个partitions就意味着有多少个”leader”,kafka会将”leader”均衡的分散在每个实例上,来确保整体的性能稳定.【我理解,在partition0里面,可能备份了2份,即两个server,其中一个为leader,另一个为follower。这里面leader、follow都是zk的概念】
producer:Producer将消息发布到指定的Topic中,同时Producer也能决定将此消息归属于哪个partition;比如基于”round-robin”方式或者通过其他的一些算法等.
consumer:一个consumer group里面有多个consumer,一个consumer只可以属于一个consumer group