简介
本文介绍KAFKA的特性和集群安装以及配合ACTIVEMQ使用
什么是KAFKA
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
为什么使用KAFKA
当数据是由上游模块产生,上游模块,使用上游模块的数据计算、统计、分析,这个时候就可以使用消息系统,尤其是分布式消息系统!kafka主要用来存储和接收上游或者下游的数据,也可以自己处理数据.下图是KAFKA和其他分布式消息系统的对比.
KAFKA 协议AMOP
Advanced Message Queuing Protocol (高级消息队列协议)
The Advanced Message Queuing Protocol (AMQP):是一个标准开放的应用层的消息中间件(Message Oriented Middleware)协议。AMQP定义了通过网络发送的字节流的数据格式。
因此兼容性非常好,任何实现AMQP协议的程序都可以和与AMQP协议兼容的其他程序交互,可以很容易做到跨语言,跨平台。
KAFKA特性
通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
高吞吐量 :即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
支持通过Kafka服务器和消费机集群来分区消息。
支持Hadoop并行数据加载。
KAFAK数据备份保证了再其他机器down机的时候,集群的稳定性,同时消息会replication备份,保证数据不丢失,且保证了消息的唯一性。
但是KAFKA尚未确保消息的发送与接收绝对可靠(比如,消息重发,消息发送丢失等),消息消费失败时需要consumer手动重置offset来重新消费该条消息。
KAFKA如何能支持消息数据的高并发
1、持久性
kafka使用文件存储消息,这就直接决定kafka在性能上严重依赖文件系统的本身特性.且无论任何OS下,对文件系统本身的优化几乎没有可能.文件缓存/直接内存映射等是常用的手段.因为kafka是对日志文件进行append操作,因此磁盘检索的开支是较小的;同时为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数.
2、性能
需要考虑的影响性能点很多,除磁盘IO之外,我们还需要考虑网络IO,这直接关系到kafka的吞吐量问题.kafka并没有提供太多高超的技巧;对于producer端,可以将消息buffer起来,当消息的条数达到一定阀值时,批量发送给broker;对于consumer端也是一样,批量fetch多条消息.不过消息量的大小可以通过配置文件来指定.对于kafka broker端,似乎有个sendfile系统调用可以潜在的提升网络IO的性能:将文件的数据映射到系统内存中,socket直接读取相应的内存区域即可,而无需进程再次copy和交换. 其实对于producer/consumer/broker三者而言,CPU的开支应该都不大,因此启用消息压缩机制是一个良好的策略;压缩需要消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑.可以将任何在网络上传输的消息都经过压缩.kafka支持gzip/snappy等多种压缩方式.
3、生产者
负载均衡: producer将会和Topic下所有partition leader保持socket连接;消息由producer直接通过socket发送到broker,中间不会经过任何”路由层”.事实上,消息被路由到哪个partition上,有producer客户端决定.比如可以采用”random””key-hash””轮询”等,如果一个topic中有多个partitions,那么在producer端实现”消息均衡分发”是必要的.
其中partition leader的位置(host:port)注册在zookeeper中,producer作为zookeeper client,已经注册了watch用来监听partition leader的变更事件.
异步发送:将多条消息暂且在客户端buffer起来,并将他们批量的发送到broker,小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率。不过这也有一定的隐患,比如说当producer失效时,那些尚未发送的消息将会丢失。
4、消费者
consumer端向broker发送”fetch”请求,并告知其获取消息的offset;此后consumer将会获得一定条数的消息;consumer端也可以重置offset来重新消费消息.
KAFKA 术语分析
Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker
Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(
物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
Partition:Partition是物理上的概念,每个Topic包含一个或多个Partition,数据也是按partition来存储的,简称为分区.
Producer:负责发布消息到Kafka broker
Consumer:消息消费者,向Kafka broker读取消息的客户端。
Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
KAFKA架构
kafka集群中的消息,是通过Topic(主题)来进行组织的,一个Topic可以认为是一类消息,每个topic将被分成多个partition(区),每个partition在存储层面是append log文件。任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset为一个long型数字,它是唯一标记一条消息。它唯一的标记一条消息。kafka并没有提供其他额外的索引机制来存储offset,因为在kafka中几乎不允许对消息进行“随机读写”,其结构如下图:
搭建KAFKA集群
1. 下载KAFKA并解压(假设zookeeper集群已经搭建好了)
下载KAFKAhttp://kafka.apache.org/downloads.html
2. 修改配置文件kafka/config/server.properties
#
#node1下
broker.id=3
advertised.listeners=PLAINTEXT:////your.host.name:9092
log.dirs=/home/hadoop/kafka_log
zookeeper.connect=192.168.80.123:2181,192.168.80.124:2181,192.168.80.125:2181
#node2下
broker.id=2
advertised.listeners=PLAINTEXT:////your.host.name:9092
log.dirs=/home/hadoop/kafka_log
zookeeper.connect=192.168.80.123:2181,192.168.80.124:2181,192.168.80.125:2181
#node3下
broker.id=3
advertised.listeners=PLAINTEXT:////your.host.name:9092
log.dirs=/home/hadoop/kafka_log
zookeeper.connect=192.168.80.123:2181,192.168.80.124:2181,192.168.80.125:2181
3.启动zookeeper和kafka集群
启动zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties
启动kafka bin/kafka-server-start.sh config/server.properties(集群下需要三台都启动)
###创建主题
bin/kafka-topics.sh --create --zookeeper 131.10.10.202:2181,131.10.10.203:2181,131.10.10.204:2181 --replication-factor 3 --partitions 8 --topic ff
###查看主题
bin/kafka-topics.sh --list --zookeeper 131.10.10.202:2181,131.10.10.203:2181,131.10.10.204:2181
###删除主题
bin/kafka-topics.sh --delete --zookeeper 131.10.10.202:2181,131.10.10.203:2181,131.10.10.204:2181 --topic test
启动生产着 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
启动消费者 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
发送消息(模拟kafka发送消息):
./kafka-console-producer.sh --broker-list 131.10.10.202:9092,131.10.10.203:9092,131.10.10.204:9092 –topic test
接收消息:
./kafka-console-consumer.sh -zookeeper 131.10.10.202:2181,131.10.10.203:2181,131.10.10.204:2181 --topic test
使用postman提交配置和删除查看配置
由于Kafka Connect的目的是作为一个服务运行,提供了一个用于管理connector的REST API。默认情况下,此服务的端口是8083。以下是当前支持的终端入口:
创建topic提交的数据
com.company.connector.sink.ActiveMQSinkConnector是自定义的类,并添加JAR到kafka的libs目录中供使用
{
"name":"iot-data-kafka-mqtt-source",
"config":{
"name":"iot-data-kafka-mqtt-source",
"connector.class":"com.company.connector.source.ActiveMQSourceConnector",
"topics":"ThingsUpstreamSourceTopic",
"tasks.max":"10",
"activemq.server_uris":"failover:(tcp://131.10.10.68:50000)?randomize=false",
"activemq.destination":"Consumer.Iot-Hub-Kafka-Mqtt.VirtualTopic.conpany.v1.things.*.>"//订阅的MQTT的topic
}
}
//--------------------------------------------------------------------------------------
{
"name":"iot-data-kafka-mqtt-sink",
"config":{
"name":"iot-data-kafka-mqtt-sink",
"connector.class":"com.company.connector.sink.ActiveMQSinkConnector",
"topics":"ThingsDownstreamMqttSinkTopic",
"tasks.max":"10",
"activemq.server_uris":"failover:(tcp://131.10.10.68:50000)?randomize=false"
}
}
常用的rest接口
GET /connectors 返回活跃的connector列表。
POST /connectors 请求的主体是包含name字段和对象config字段(connector的配置参数)的JSON对象。
GET /connectors/{name} 获取指定connector的信息。
GET /connectors/{name}/config 获取指定connector的配置参数。
PUT /connectors/{name}/config 更新指定connector的配置参数。
GET /connectors/{name}/status 获取connector的当前状态,包括它是否正在运行,失败,暂停等。
GET /connectors/{name}/tasks 获取当前正在运行的connector的任务列表。
GET /connectors/{name}/tasks/{taskid}/status 获取任务的当前状态,包括是否是运行中的,失败的,暂停的等。
PUT /connectors/{name}/pause 暂停连接器和它的任务,停止消息处理,直到connector恢复。
PUT /connectors/{name}/resume 恢复暂停的connector(如果connector没有暂停,则什么都不做)。
POST /connectors/{name}/restart 重启connector(connector已故障)。
POST /connectors/{name}/tasks/{taskId}/restart 重启单个任务 (通常这个任务已失败)。
DELETE /connectors/{name} 删除connector, 停止所有的任务并删除其配置。
总结
KAFKA作为支持高并发,高可用的分布式消息系统现在广泛的应用在各大商业平台,如果你需要对消息处理做高并发的支持,并且希望消息日志和消息具有高可靠性,可以选用KAFAKA做分布式消息系统,其结合其他分布式消息系统可组成一套高可用、高并发的消息处理流程,例如activeMQ做上游消息处理,kafka集群做中间层持久化数据和消息分发,使用storm做消息的实时处理.