Kafka 是一种高吞吐量的分布式发布订阅消息系统。(Apache Kafka 是一个快速、可扩展的、高吞吐的、可容错的分布式“发布-订阅”消息系统, 使用 Scala 与 Java 语言编写,能够将消息从一个端点传递到另一个端点。)
较之传统的消息中间件(例如 ActiveMQ、RabbitMQ),Kafka 具有高吞吐量、内置分区、支持消息副本和高容错的特性,非常适合大规模消息处理应用程序。
Kafka的特点:优势
高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。【据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)】
支持通过Kafka服务器和消费机集群来区分消息,也就是可以对消息进行分类,然后使用不同分类的服务器消费机去消费不同分类的消息。
支持Kafka Server间的消息分区,及分布式消息消费,同时保证每个Partition内的消息顺序传输。
分布式系统,易于向外扩展。所有的producer、broker和consumer都会有多个,均为分布式的。无需停机即可扩展机器。
消息持久化,所有的消息均被持久化到磁盘,无消息丢失,支持消息重放
同时支持离线数据处理和实时数据处理。
支持Hadoop并行数据加载。
消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡。
相关概念
Broker:kafka集群中的一台或者多台服务器统称为broker。 Topic:Kafka处理的消息源(feeds of messages)的不同分类,可以理解为消息分类。 Partition:Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分 配一个有序的id(offset)。也就是可以理解为一个群的群名称或者群号,因为大家都在这个群里面消费,成为分类,然后消费topic的时 候进行物理分组,比如一个partition不够用,可以分配给多个partition。 Message:消息,是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。 Producers:消息和数据的生产者,向Kafka的一个topic发布消息的过程叫做producers。 Consumers:消息和数据消费者,订阅topics并处理其发布的消息过程叫做consumers。
Kafka的架构:
Kafka的整体架构非常简单,producer、broker(kafka)和consumer都可以有多个。Producer,consumer实现Kafka注册的接口,数据从producer发送到broker,broker承担一个中间缓存和分发的作用。broker分发注册到系统中的consumer。broker的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。客户端和服务器端的通信,是基于简单,高性能,且与编程语言无关的TCP协议。
Kafka消息发送的流程:
PHP下面有通用的两种方式来调用 Kafka。
php-rdkafka 扩展
以 PHP 扩展的形式进行使用是非常高效的。另外,该项目也提供了非常完备的 文档 。
kafka-php 扩展包
Kafka-php 使用纯粹的 PHP 编写的 Kafka 客户端,目前支持 0.8.x 以上版本的 Kafka。由于使用 PHP 语言编写所以不用编译任何的扩展就可以使用,降低了接入与维护成本。
【安装 Kafka 服务】
官网:http://kafka.apache.org/downloads.html ,下载二进制资源(Binary downloads)
1.下载、解压
[root@localhost ~]# wget https://mirrors.bfsu.edu.cn/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz [root@localhost ~]# mv kafka_2.13-2.6.0.tgz /usr/local/src/ [root@localhost ~]# cd /usr/local/src/ [root@localhost src]# tar -zxvf kafka_2.13-2.6.0.tgz [root@localhost src]# cd kafka_2.13-2.6.0/ 这里下载的是二进制版本(V3.1.0)。kafka自带打包和配置好 zookeeper,无需单独安装zookeeper。解压后,可以看到目录结构如下: kafka Kafka 根目录 ├─bin Kafka 运行的脚本 │ ├─connect-distributed.sh 连接 kafka 集群模式 │ ├─connect-standalone.sh 连接 kafka 单机模式 │ ├─kafka-acls.sh │ ├─kafka-broker-api-versions.sh │ ├─kafka-configs.sh 配置管理脚本 │ ├─kafka-console-consumer.sh kafka 消费者控制台 │ ├─kafka-console-producer.sh kafka 生产者控制台 │ ├─kafka-consumer-groups.sh kafka 消费者组相关信息 │ ├─kafka-consumer-perf-test.sh kafka 消费者性能测试 │ ├─kafka-delegation-tokens.sh │ ├─kafka-delete-records.sh 删除低水位的日志文件 │ ├─kafka-dump-log.sh │ ├─kafka-log-dirs.sh kafka消息日志目录 │ ├─kafka-mirror-maker.sh 不同数据中心 kafka 集群复制工具 │ ├─kafka-preferred-replica-election.sh 触发 preferred replica 选举 │ ├─kafka-producer-perf-test.sh kafka 生产者性能测试脚本 │ ├─kafka-reassign-partitions.sh 分区重分配脚本 │ ├─kafka-replica-verification.sh 复制进度验证脚本 │ ├─kafka-run-class.sh │ ├─kafka-server-start.sh 启动 kafka 服务 │ ├─kafka-server-stop.sh 停止 kafka 服务 │ ├─kafka-streams-application-reset.sh │ ├─kafka-topics.sh kafka主题 │ ├─kafka-verifiable-consumer.sh 可检验的 kafka 消费者 │ ├─kafka-verifiable-producer.sh 可检验的 kafka 生产者 │ └─trogdor.sh │ ├─windows 在 Windows 系统下执行的脚本目录 │ │ ├─connect-distributed.bat │ │ └─ … 更多 windows 下执行的脚本文件 │ ├─zookeeper-security-migration.sh │ ├─zookeeper-server-start.sh 启动 zk 服务 │ ├─zookeeper-server-stop.sh 停止 zk 服务 │ └─zookeeper-shell.sh zk 客户端脚本 │ ├─config Kafka、zookeeper 等配置文件 │ ├─connect-console-sink.properties │ ├─connect-console-source.properties │ ├─connect-distributed.properties │ ├─connect-file-sink.properties │ ├─connect-file-source.properties │ ├─connect-log4j.properties │ ├─connect-standalone.properties │ ├─consumer.properties 消费者配置 │ ├─log4j.properties │ ├─producer.properties 生产者配置 │ ├─server.properties kafka 服务配置 │ ├─tools-log4j.properties │ ├─trogdor.conf │ └─zookeeper.properties zk 服务配置 │ ├─libs Kafka 运行的依赖库 │ ├─activation-1.1.1.jar │ └─... │ ├─site-docs/ Kafka 相关文档 │ ├─kafka_2.12-2.3.1-site-docs.tgz [root@localhost local]# mv kafka_2.13-3.1.0/ kafka 查看kafka当前版本 1.进到kafka的安装目录 2.执行下列语句: [root@localhost kafka]# find ./libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*' kafka_2.13-3.1.0.jar 就可以看到kafka的具体版本了。其中,2.13为scala版本,3.1.0为kafka版本。
2.启动 Kafka 服务
2.1 启动Zookeeper server(使用安装包中的脚本启动单节点 Zookeeper 实例)
[root@localhost kafka_2.13-2.6.0]# bin/zookeeper-server-start.sh config/zookeeper.properties & 或 [root@localhost kafka_2.13-2.6.0]# bin/zookeeper-server-start.sh -daemon config/zookeeper.properties -daemon 可启动后台守护模式
2.2 启动Kafka server(使用 kafka-server-start.sh 启动 kafka 服务)
[root@localhost kafka_2.13-2.6.0]# bin/kafka-server-start.sh config/server.properties & 或 [root@localhost kafka_2.13-2.6.0]# bin/kafka-server-start.sh -daemon config/server.properties
2.3 创建主题
1、启动kafka客户端测试 # 创建一个Topic话题,test话题2个分区 [root@localhost kafka_2.13-2.6.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 1 --partitions 2 Created topic "test". --zookeeper 指定了 Kafka 所连接的 ZooKeeper 服务地址, --create 是创建主题的动作指令, --bootstrap-server 指定了连接的 Kafka 集群地址, --topic 指定了所要创建主题的名称, --replication-factor 指定了副本因子, --partitions 指定了分区个数。 即创建了一个分区为 2、副本因子为 1 的主题 topic-demo。 注意:在较新版本(2.2 及更高版本)的 Kafka 不再需要 ZooKeeper 连接字符串,即 --zookeeper localhost:2181。使用 Kafka Broker的 --bootstrap-server localhost:9092来替代 --zookeeper localhost:2181 [root@localhost kafka]# bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test --replication-factor 1 --partitions 2 Created topic test. 2、查看 topic 列表,检查是否创建成功 [root@localhost kafka_2.13-2.6.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181 test 最新版本 [root@localhost kafka]# bin/kafka-topics.sh --bootstrap-server localhost:9092 --list test 3、显示topic信息 [root@localhost kafka_2.13-2.6.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:2 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0 最新版本 [root@localhost kafka]# bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe Topic: test TopicId: YXU2WX3iQxCudTBJ50fFrw PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0
2.4 运行生产者producer
# 启动一个生产者(输入消息) [root@localhost kafka_2.13-2.6.0]# bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test --broker-list 指定了连接的 Kafka 集群地址 --topic 指定了发送消息时的主题。
2.5 运行消费者consumer
另开一个终端,执行如下命令 # 启动一个消费者(等待消息) # 注意这里的--from-beginning,每次都会从头开始读取,你可以尝试去掉和不去掉看下效果 [root@localhost kafka_2.13-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test [root@localhost kafka_2.13-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --property print.key=true --topic test --bootstrap-server 指定了连接的 Kafka 集群地址, --topic 指定了消费者订阅的主题。 # 注意这里的--from-beginning,每次都会从头开始读取,你可以尝试去掉和不去掉看下效果 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning [等待消息] i am a new msg ! i am a good msg ?
可以看到,当使用 kafka-console-producer.sh 脚本发送消息至主题 topic-demo后,当前终端窗口会同步刚刚输入的消息内容,这说明,消费者消费了消息.
3.当有跨机的producer或consumer连接时
需要配置config/server.properties的host.name,要不然跨机的连不上。
虽然 Kafka 是用 Java/Scala 语言编写的,但这不妨碍它对多语言的支持。可以在 Kafka 官网的 CLIENTS 查看 Kafka 支持的语言,其中包括 C/C++、Python、Go 等语言。
PHP 操作 Kafka 需要安装 librdkafka 库和 kafka 的 PHP 扩展。
【rakafka扩展安装】
在使用 PHP 处理 Kafka 消息的时候需要使用一个 PHP 的扩展 php-rdkafka 下面将介绍一下如何在 Linux / Mac OS 下安装 php-rdkafka。
rdkafka 安装需要依赖 librdkafka , 所以先安装 librdkafka
【安装librdkafka 库】https://github.com/edenhill/librdkafka
[root@localhost ~]# git clone https://github.com/edenhill/librdkafka.git [root@localhost ~]# mv librdkafka/ /usr/local/src/ [root@localhost ~]# cd /usr/local/src/ [root@localhost src]# cd librdkafka/ [root@localhost librdkafka]# ./configure [root@localhost librdkafka]# make [root@localhost librdkafka]# make install
【安装php-rdkafka 扩展】https://github.com/arnaud-lb/php-rdkafka
[root@localhost ~]# git clone https://github.com/arnaud-lb/php-rdkafka.git [root@localhost ~]# mv php-rdkafka/ /usr/local/src/ [root@localhost ~]# cd php-rdkafka/ [root@localhost ~]# cd /usr/local/src/ [root@localhost src]# cd php-rdkafka/ #生成configure文件 [root@localhost php-rdkafka]# /usr/local/php7/bin/phpize Configuring for: PHP Api Version: 20180731 Zend Module Api No: 20180731 Zend Extension Api No: 320180731 #编译安装 [root@localhost php-rdkafka]# ./configure --with-php-config=/usr/local/php7/bin/php-config [root@localhost php-rdkafka]# make && make install .... Installing shared extensions: /usr/local/php7/lib/php/extensions/no-debug-non-zts-20180731/ #在php.ini 文件中配置 rdkafka扩展 ##注意 php.ini实际加载路径 [root@localhost php-rdkafka]# vim /usr/local/php7/etc/php.ini extension=rdkafka [root@localhost php-rdkafka]# systemctl restart php-fpm.service #查看扩展是否生效 [root@localhost php-rdkafka]# php -m | grep kafka
【编码实现】
1.【生产者】-- 新建 producer.php
<?php /** * 消息生产者 * * 实现的例子来源于: * * https://github.com/arnaud-lb/php-rdkafka#examples */ $objRdKafka = new RdKafka\Producer(); $objRdKafka->setLogLevel(LOG_DEBUG); $objRdKafka->addBrokers("localhost:9092"); $oObjTopic = $objRdKafka->newTopic("test"); // 从终端接收输入 $oInputHandler = fopen('php://stdin', 'r'); while (true) { echo "\nEnter messages:\n"; $sMsg = trim(fgets($oInputHandler)); // 空消息意味着退出 if (empty($sMsg)) { break; } /** * 发送消息 * * The first argument is the partition. RD_KAFKA_PARTITION_UA stands for unassigned, and lets librdkafka choose the partition. * 第一个参数是分区,RD_KAFKA_PARTITION_UA 表示未分配,并且由 librdkafka 选择分区。 * The second argument are message flags and should be either 0 or RD_KAFKA_MSG_F_BLOCK to block produce on full queue. * 第二个参数是消息标志,为 0 或 RD_KAFKA_MSG_F_BLOCK,当队列满了时阻止生产消息。 * The message payload can be anything. * 消息可以是任何内容。 */ $oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg); } echo "done\n";
2. 【检验发送是否成功】
终端开启一个消费者:
# 因为生产者会往test的topic中发送消息,消费者直接消费test即可
[root@localhost kafka_2.13-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test 生产者端发送: [root@localhost ~]# php producer.php Enter messages: one Enter messages: two 消费者端接收: [root@localhost ~]# kafka-console-consumer --bootstrap-server localhost:9092 --topic test one two
3.【消费者】-- 新建 consumer.php
<?php /** * 消费者消费消息 * * 实现的例子来源于: * * https://github.com/arnaud-lb/php-rdkafka#examples */ $objRdKafka = new RdKafka\Consumer(); $objRdKafka->setLogLevel(LOG_DEBUG); $objRdKafka->addBrokers("localhost:9092"); $oObjTopic = $objRdKafka->newTopic("test"); /** * consumeStart * 第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息 * 第二个参数标识从什么位置开始拉取消息,可选值为 * RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息 * RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息 * RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样 */ $oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END); while (true) { // 第一个参数是分区,第二个参数是超时时间 $oMsg = $oObjTopic->consume(0, 1000); // 没拉取到消息时,返回NULL if (!$oMsg) { usleep(10000); continue; } if ($oMsg->err) { echo $msg->errstr(), "\n"; break; } else { echo $oMsg->payload, "\n"; } }
4. 【检验】
生产者端发送: [root@localhost ~]# php producer.php Enter messages: 123 Enter messages: 456 消费者端接收: [root@localhost ~]# php consumer.php 123 456
注意:
消息是异步的,死循环是等待消息发送成功,相关其他处理方发:https://www.jianshu.com/p/cf74346acf13
相关文档:
官方文档:http://kafka.apache.org/documentation/#quickstart
kafka PHP官方扩展文档:https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/book.rdkafka.html
kafka 配置项的官方说明: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
Kafka 基础:https://www.jianshu.com/p/0b802b65ef11
kafka中文教程:http://orchome.com/kafka/index
安装: https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka.setup.html
示例: https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka.examples.html
librdkafka库 : https://github.com/edenhill/librdkafka
PHP-Kafka客户端 :https://github.com/arnaud-lb/php-rdkafka
低级、高级消费模式:https://www.cnblogs.com/wenhainan/p/10932147.htm
相关示例:
https://www.cnblogs.com/wt645631686/p/8303076.html
https://www.cnblogs.com/liuxinyustu/articles/12488945.html
本文为崔凯原创文章,转载无需和我联系,但请注明来自冷暖自知一抹茶ckhttp://www.cksite.cn