Linux kafka服务安装及rdkafka扩展使用

        

        Kafka 是一种高吞吐量的分布式发布订阅消息系统。(Apache Kafka 是一个快速、可扩展的、高吞吐的、可容错的分布式“发布-订阅”消息系统, 使用 Scala 与 Java 语言编写,能够将消息从一个端点传递到另一个端点。)

        较之传统的消息中间件(例如 ActiveMQ、RabbitMQ),Kafka 具有高吞吐量、内置分区、支持消息副本和高容错的特性,非常适合大规模消息处理应用程序。


        Kafka的特点:优势

  •         高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。【据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)】

  •         支持通过Kafka服务器和消费机集群来区分消息,也就是可以对消息进行分类,然后使用不同分类的服务器消费机去消费不同分类的消息。

  •         支持Kafka Server间的消息分区,及分布式消息消费,同时保证每个Partition内的消息顺序传输。

  •         分布式系统,易于向外扩展。所有的producer、broker和consumer都会有多个,均为分布式的。无需停机即可扩展机器。

  •         消息持久化,所有的消息均被持久化到磁盘,无消息丢失,支持消息重放

  •         同时支持离线数据处理和实时数据处理。

  •         支持Hadoop并行数据加载。

  •         消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡。


        相关概念

Broker:kafka集群中的一台或者多台服务器统称为broker。

Topic:Kafka处理的消息源(feeds of messages)的不同分类,可以理解为消息分类。

Partition:Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分
配一个有序的id(offset)。也就是可以理解为一个群的群名称或者群号,因为大家都在这个群里面消费,成为分类,然后消费topic的时
候进行物理分组,比如一个partition不够用,可以分配给多个partition。

Message:消息,是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。

Producers:消息和数据的生产者,向Kafka的一个topic发布消息的过程叫做producers。

Consumers:消息和数据消费者,订阅topics并处理其发布的消息过程叫做consumers。

         Kafka的架构:

冷暖自知一抹茶ck

        Kafka的整体架构非常简单,producer、broker(kafka)和consumer都可以有多个。Producer,consumer实现Kafka注册的接口,数据从producer发送到broker,broker承担一个中间缓存和分发的作用。broker分发注册到系统中的consumer。broker的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。客户端和服务器端的通信,是基于简单,高性能,且与编程语言无关的TCP协议。

        Kafka消息发送的流程:

        

冷暖自知一抹茶ck



        

        PHP下面有通用的两种方式来调用 Kafka。

                php-rdkafka 扩展

                        以 PHP 扩展的形式进行使用是非常高效的。另外,该项目也提供了非常完备的 文档

                kafka-php 扩展包

                        Kafka-php 使用纯粹的 PHP 编写的 Kafka 客户端,目前支持 0.8.x 以上版本的 Kafka。由于使用 PHP 语言编写所以不用编译任何的扩展就可以使用,降低了接入与维护成本。


【安装 Kafka 服务】

        官网:http://kafka.apache.org/downloads.html   ,下载二进制资源(Binary downloads)

冷暖自知一抹茶ck

        

        1.下载、解压

[root@localhost ~]# wget https://mirrors.bfsu.edu.cn/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz
[root@localhost ~]# mv kafka_2.13-2.6.0.tgz /usr/local/src/
[root@localhost ~]# cd /usr/local/src/
[root@localhost src]# tar -zxvf kafka_2.13-2.6.0.tgz 
[root@localhost src]# cd kafka_2.13-2.6.0/

这里下载的是二进制版本(V3.1.0)。kafka自带打包和配置好 zookeeper,无需单独安装zookeeper。解压后,可以看到目录结构如下:
    kafka                                 Kafka 根目录
    ├─bin                                               Kafka 运行的脚本
    │  ├─connect-distributed.sh                         连接 kafka 集群模式
    │  ├─connect-standalone.sh                          连接 kafka 单机模式
    │  ├─kafka-acls.sh                 
    │  ├─kafka-broker-api-versions.sh         
    │  ├─kafka-configs.sh                             配置管理脚本
    │  ├─kafka-console-consumer.sh                       kafka 消费者控制台
    │  ├─kafka-console-producer.sh                        kafka 生产者控制台
    │  ├─kafka-consumer-groups.sh                        kafka 消费者组相关信息
    │  ├─kafka-consumer-perf-test.sh                     kafka 消费者性能测试
    │  ├─kafka-delegation-tokens.sh        
    │  ├─kafka-delete-records.sh                          删除低水位的日志文件
    │  ├─kafka-dump-log.sh                 
    │  ├─kafka-log-dirs.sh                            kafka消息日志目录
    │  ├─kafka-mirror-maker.sh                          不同数据中心 kafka 集群复制工具
    │  ├─kafka-preferred-replica-election.sh            触发 preferred replica 选举
    │  ├─kafka-producer-perf-test.sh                      kafka 生产者性能测试脚本
    │  ├─kafka-reassign-partitions.sh                      分区重分配脚本
    │  ├─kafka-replica-verification.sh                  复制进度验证脚本
    │  ├─kafka-run-class.sh        
    │  ├─kafka-server-start.sh                          启动 kafka 服务
    │  ├─kafka-server-stop.sh                           停止 kafka 服务
    │  ├─kafka-streams-application-reset.sh         
    │  ├─kafka-topics.sh                                kafka主题
    │  ├─kafka-verifiable-consumer.sh                    可检验的 kafka 消费者
    │  ├─kafka-verifiable-producer.sh                     可检验的 kafka 生产者
    │  └─trogdor.sh
    │  ├─windows                                        在 Windows 系统下执行的脚本目录
    │  │  ├─connect-distributed.bat
    │  │  └─ …                                          更多 windows 下执行的脚本文件
    │  ├─zookeeper-security-migration.sh            
    │  ├─zookeeper-server-start.sh                        启动 zk 服务
    │  ├─zookeeper-server-stop.sh                        停止 zk 服务
    │  └─zookeeper-shell.sh                                zk 客户端脚本
    │
    ├─config                                            Kafka、zookeeper 等配置文件
    │  ├─connect-console-sink.properties            
    │  ├─connect-console-source.properties          
    │  ├─connect-distributed.properties             
    │  ├─connect-file-sink.properties               
    │  ├─connect-file-source.properties             
    │  ├─connect-log4j.properties                   
    │  ├─connect-standalone.properties              
    │  ├─consumer.properties                            消费者配置
    │  ├─log4j.properties                           
    │  ├─producer.properties                             生产者配置
    │  ├─server.properties                                 kafka 服务配置
    │  ├─tools-log4j.properties       
    │  ├─trogdor.conf       
    │  └─zookeeper.properties                           zk 服务配置
    │
    ├─libs                                                 Kafka 运行的依赖库
    │  ├─activation-1.1.1.jar
    │  └─...                          
    │
    ├─site-docs/                                         Kafka 相关文档
    │  ├─kafka_2.12-2.3.1-site-docs.tgz


[root@localhost local]# mv kafka_2.13-3.1.0/ kafka
查看kafka当前版本
    1.进到kafka的安装目录
    2.执行下列语句:
[root@localhost kafka]# find ./libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'
kafka_2.13-3.1.0.jar
就可以看到kafka的具体版本了。其中,2.13为scala版本,3.1.0为kafka版本。

        2.启动 Kafka 服务

        2.1 启动Zookeeper server(使用安装包中的脚本启动单节点 Zookeeper 实例)

[root@localhost kafka_2.13-2.6.0]# bin/zookeeper-server-start.sh config/zookeeper.properties &
或 
[root@localhost kafka_2.13-2.6.0]# bin/zookeeper-server-start.sh -daemon config/zookeeper.properties -daemon 可启动后台守护模式

        2.2 启动Kafka server(使用 kafka-server-start.sh 启动 kafka 服务)

[root@localhost kafka_2.13-2.6.0]# bin/kafka-server-start.sh config/server.properties &
或 
[root@localhost kafka_2.13-2.6.0]# bin/kafka-server-start.sh -daemon config/server.properties

        2.3 创建主题

1、启动kafka客户端测试
# 创建一个Topic话题,test话题2个分区
[root@localhost kafka_2.13-2.6.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 1 --partitions 2
Created topic "test".

--zookeeper 指定了 Kafka 所连接的 ZooKeeper 服务地址,
--create 是创建主题的动作指令,
--bootstrap-server 指定了连接的 Kafka 集群地址,
--topic 指定了所要创建主题的名称,
--replication-factor 指定了副本因子,
--partitions 指定了分区个数。
即创建了一个分区为 2、副本因子为 1 的主题 topic-demo。

注意:在较新版本(2.2 及更高版本)的 Kafka 不再需要 ZooKeeper 连接字符串,即 --zookeeper localhost:2181。使用 Kafka Broker的 --bootstrap-server localhost:9092来替代  --zookeeper localhost:2181
[root@localhost kafka]# bin/kafka-topics.sh --bootstrap-server localhost:9092  --create --topic test --replication-factor 1 --partitions 2
Created topic test.


2、查看 topic 列表,检查是否创建成功
[root@localhost kafka_2.13-2.6.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181
test
最新版本
    [root@localhost kafka]# bin/kafka-topics.sh --bootstrap-server localhost:9092  --list
    test

3、显示topic信息
[root@localhost kafka_2.13-2.6.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test    PartitionCount:2    ReplicationFactor:1    Configs:
    Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
    Topic: test    Partition: 1    Leader: 0    Replicas: 0    Isr: 0
最新版本
[root@localhost kafka]# bin/kafka-topics.sh --bootstrap-server localhost:9092  --describe
Topic: test    TopicId: YXU2WX3iQxCudTBJ50fFrw    PartitionCount: 2    ReplicationFactor: 1    Configs: segment.bytes=1073741824
    Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
    Topic: test    Partition: 1    Leader: 0    Replicas: 0    Isr: 0

        2.4 运行生产者producer

# 启动一个生产者(输入消息)
[root@localhost kafka_2.13-2.6.0]# bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

--broker-list 指定了连接的 Kafka 集群地址
--topic 指定了发送消息时的主题。

        2.5 运行消费者consumer

另开一个终端,执行如下命令
# 启动一个消费者(等待消息)
# 注意这里的--from-beginning,每次都会从头开始读取,你可以尝试去掉和不去掉看下效果
[root@localhost kafka_2.13-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test
[root@localhost kafka_2.13-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --property print.key=true --topic test

--bootstrap-server 指定了连接的 Kafka 集群地址,
--topic 指定了消费者订阅的主题。

# 注意这里的--from-beginning,每次都会从头开始读取,你可以尝试去掉和不去掉看下效果
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
[等待消息]
i am a new msg !
i am a good msg ?

        可以看到,当使用 kafka-console-producer.sh 脚本发送消息至主题 topic-demo后,当前终端窗口会同步刚刚输入的消息内容,这说明,消费者消费了消息.


        3.当有跨机的producer或consumer连接时

        需要配置config/server.properties的host.name,要不然跨机的连不上。



        虽然 Kafka 是用 Java/Scala 语言编写的,但这不妨碍它对多语言的支持。可以在 Kafka 官网的 CLIENTS 查看 Kafka 支持的语言,其中包括 C/C++、Python、Go 等语言。

        PHP 操作 Kafka 需要安装 librdkafka 库和 kafka 的 PHP 扩展。


【rakafka扩展安装】

         在使用 PHP 处理 Kafka 消息的时候需要使用一个 PHP 的扩展 php-rdkafka 下面将介绍一下如何在 Linux / Mac OS 下安装 php-rdkafka。

        rdkafka 安装需要依赖 librdkafka , 所以先安装 librdkafka

        

        【安装librdkafka 库】https://github.com/edenhill/librdkafka 

[root@localhost ~]# git clone https://github.com/edenhill/librdkafka.git
[root@localhost ~]# mv librdkafka/ /usr/local/src/
[root@localhost ~]# cd /usr/local/src/
[root@localhost src]# cd librdkafka/
[root@localhost librdkafka]# ./configure
[root@localhost librdkafka]# make
[root@localhost librdkafka]# make install

        【安装php-rdkafka 扩展】https://github.com/arnaud-lb/php-rdkafka 

[root@localhost ~]# git clone https://github.com/arnaud-lb/php-rdkafka.git
[root@localhost ~]# mv php-rdkafka/ /usr/local/src/
[root@localhost ~]# cd php-rdkafka/
[root@localhost ~]# cd /usr/local/src/
[root@localhost src]# cd php-rdkafka/	

#生成configure文件
[root@localhost php-rdkafka]# /usr/local/php7/bin/phpize
Configuring for:
PHP Api Version:         20180731
Zend Module Api No:      20180731
Zend Extension Api No:   320180731


#编译安装
[root@localhost php-rdkafka]# ./configure --with-php-config=/usr/local/php7/bin/php-config
[root@localhost php-rdkafka]# make && make install
....
Installing shared extensions:     /usr/local/php7/lib/php/extensions/no-debug-non-zts-20180731/

#在php.ini 文件中配置 rdkafka扩展
##注意 php.ini实际加载路径
[root@localhost php-rdkafka]# vim /usr/local/php7/etc/php.ini
extension=rdkafka
[root@localhost php-rdkafka]# systemctl restart  php-fpm.service

#查看扩展是否生效
[root@localhost php-rdkafka]# php -m | grep kafka


【编码实现】

        1.【生产者】-- 新建 producer.php

<?php
/**
 * 消息生产者
 *
 * 实现的例子来源于:
 *
 * https://github.com/arnaud-lb/php-rdkafka#examples
 */

$objRdKafka = new RdKafka\Producer();
$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("localhost:9092");

$oObjTopic = $objRdKafka->newTopic("test");

// 从终端接收输入 
$oInputHandler = fopen('php://stdin', 'r');

while (true) {
    echo "\nEnter  messages:\n";
    $sMsg = trim(fgets($oInputHandler));

   // 空消息意味着退出
    if (empty($sMsg)) {
        break;
    }

    /**
     * 发送消息
     *
     * The first argument is the partition. RD_KAFKA_PARTITION_UA stands for unassigned, and lets librdkafka choose the partition.
     * 第一个参数是分区,RD_KAFKA_PARTITION_UA 表示未分配,并且由 librdkafka 选择分区。
     * The second argument are message flags and should be either 0 or RD_KAFKA_MSG_F_BLOCK to block produce on full queue.
     * 第二个参数是消息标志,为 0 或 RD_KAFKA_MSG_F_BLOCK,当队列满了时阻止生产消息。
     * The message payload can be anything.
     * 消息可以是任何内容。
     */
    $oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg);
}

echo "done\n";


        2. 【检验发送是否成功】

        终端开启一个消费者:

        # 因为生产者会往test的topic中发送消息,消费者直接消费test即可

[root@localhost kafka_2.13-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test

生产者端发送:	[root@localhost ~]# php producer.php
Enter messages:
one

Enter messages:
two

消费者端接收:	[root@localhost ~]# kafka-console-consumer --bootstrap-server localhost:9092 --topic test
one
two

        

        3.【消费者】-- 新建 consumer.php

<?php

/**
 * 消费者消费消息
 *
 * 实现的例子来源于:
 *
 * https://github.com/arnaud-lb/php-rdkafka#examples
 */

$objRdKafka = new RdKafka\Consumer();
$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("localhost:9092");

$oObjTopic = $objRdKafka->newTopic("test");

/**
 * consumeStart
 *   第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息
 *   第二个参数标识从什么位置开始拉取消息,可选值为
 *     RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息
 *     RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息
 *     RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样
 */
$oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END);

while (true) {
   // 第一个参数是分区,第二个参数是超时时间
    $oMsg = $oObjTopic->consume(0, 1000);

    // 没拉取到消息时,返回NULL
    if (!$oMsg) {
        usleep(10000);
            continue;
    }

    if ($oMsg->err) {
        echo $msg->errstr(), "\n";
	break;
    } else {
        echo $oMsg->payload, "\n";
    }
}

        4. 【检验】

生产者端发送:	[root@localhost ~]# php producer.php
Enter messages:
123
						
Enter messages:
456
		
消费者端接收:	[root@localhost ~]# php consumer.php
123
456		
	

        

    注意:

           消息是异步的,死循环是等待消息发送成功,相关其他处理方发:https://www.jianshu.com/p/cf74346acf13 

冷暖自知一抹茶ck




相关文档:


        官方文档:http://kafka.apache.org/documentation/#quickstart 

        

        kafka PHP官方扩展文档:https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/book.rdkafka.html 

        kafka 配置项的官方说明: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md 

        Kafka 基础:https://www.jianshu.com/p/0b802b65ef11

        kafka中文教程:http://orchome.com/kafka/index 


        安装: https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka.setup.html 

        示例: https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka.examples.html 


        librdkafka库 : https://github.com/edenhill/librdkafka 

        PHP-Kafka客户端 :https://github.com/arnaud-lb/php-rdkafka 

        低级、高级消费模式:https://www.cnblogs.com/wenhainan/p/10932147.htm



        相关示例:

                https://www.cnblogs.com/wt645631686/p/8303076.html 

                https://www.cnblogs.com/liuxinyustu/articles/12488945.html

冷暖自知一抹茶ck
请先登录后发表评论
  • 最新评论
  • 总共0条评论