Linux kafka服务安装及rdkafka扩展使用

        

        Kafka 是一种高吞吐量的分布式发布订阅消息系统。(Apache Kafka 是一个快速、可扩展的、高吞吐的、可容错的分布式“发布-订阅”消息系统, 使用 Scala 与 Java 语言编写,能够将消息从一个端点传递到另一个端点。)

        较之传统的消息中间件(例如 ActiveMQ、RabbitMQ),Kafka 具有高吞吐量、内置分区、支持消息副本和高容错的特性,非常适合大规模消息处理应用程序。

        

        PHP下面有通用的两种方式来调用 Kafka。

                php-rdkafka 扩展

                        以 PHP 扩展的形式进行使用是非常高效的。另外,该项目也提供了非常完备的 文档

                kafka-php 扩展包

                        Kafka-php 使用纯粹的 PHP 编写的 Kafka 客户端,目前支持 0.8.x 以上版本的 Kafka。由于使用 PHP 语言编写所以不用编译任何的扩展就可以使用,降低了接入与维护成本。


【安装 Kafka 服务】

        官网:http://kafka.apache.org/downloads.html   ,下载二进制资源(Binary downloads)

冷暖自知一抹茶ck

        

        1.下载、解压

[root@localhost ~]# wget https://mirrors.bfsu.edu.cn/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz
[root@localhost ~]# mv kafka_2.13-2.6.0.tgz /usr/local/src/
[root@localhost ~]# cd /usr/local/src/
[root@localhost src]# tar -zxvf kafka_2.13-2.6.0.tgz 
[root@localhost src]# cd kafka_2.13-2.6.0/

        2.启动 Kafka 服务

        2.1 启动Zookeeper server(使用安装包中的脚本启动单节点 Zookeeper 实例)

[root@localhost kafka_2.13-2.6.0]# bin/zookeeper-server-start.sh config/zookeeper.properties &
或 
[root@localhost kafka_2.13-2.6.0]# bin/zookeeper-server-start.sh -daemon config/zookeeper.properties -daemon 可启动后台守护模式

        2.2 启动Kafka server(使用 kafka-server-start.sh 启动 kafka 服务)

[root@localhost kafka_2.13-2.6.0]# bin/kafka-server-start.sh config/server.properties &
或 
[root@localhost kafka_2.13-2.6.0]# bin/kafka-server-start.sh -daemon config/server.properties

        2.3 运行生产者producer

# 启动一个生产者(输入消息)
[root@localhost kafka_2.13-2.6.0]# bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

        2.4 运行消费者consumer

# 启动一个消费者(等待消息) 
# 注意这里的--from-beginning,每次都会从头开始读取,你可以尝试去掉和不去掉看下效果
[root@localhost kafka_2.13-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test
[root@localhost kafka_2.13-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --property print.key=true --topic test

        3.当有跨机的producer或consumer连接时

        需要配置config/server.properties的host.name,要不然跨机的连不上。


【rakafka扩展安装】

        rdkafka 安装需要依赖 librdkafka , 所以先安装 librdkafka

        

        【安装librdkafka 库】https://github.com/edenhill/librdkafka 

[root@localhost ~]# git clone https://github.com/edenhill/librdkafka.git
[root@localhost ~]# mv librdkafka/ /usr/local/src/
[root@localhost ~]# cd /usr/local/src/
[root@localhost src]# cd librdkafka/
[root@localhost librdkafka]# ./configure
[root@localhost librdkafka]# make
[root@localhost librdkafka]# make install

        【安装php-rdkafka 扩展】https://github.com/arnaud-lb/php-rdkafka 

[root@localhost ~]# git clone https://github.com/arnaud-lb/php-rdkafka.git
[root@localhost ~]# mv php-rdkafka/ /usr/local/src/
[root@localhost ~]# cd php-rdkafka/
[root@localhost ~]# cd /usr/local/src/
[root@localhost src]# cd php-rdkafka/	

#生成configure文件
[root@localhost php-rdkafka]# /usr/local/php7/bin/phpize
Configuring for:
PHP Api Version:         20180731
Zend Module Api No:      20180731
Zend Extension Api No:   320180731


#编译安装
[root@localhost php-rdkafka]# ./configure --with-php-config=/usr/local/php7/bin/php-config
[root@localhost php-rdkafka]# make && make install
....
Installing shared extensions:     /usr/local/php7/lib/php/extensions/no-debug-non-zts-20180731/

#在php.ini 文件中配置 rdkafka扩展
##注意 php.ini实际加载路径
[root@localhost php-rdkafka]# vim /usr/local/php7/etc/php.ini
extension=rdkafka
[root@localhost php-rdkafka]# systemctl restart  php-fpm.service

#查看扩展是否生效
[root@localhost php-rdkafka]# php -m | grep kafka


【编码实现】

        1.【生产者】-- 新建 producer.php

<?php
/**
 * 消息生产者
 *
 * 实现的例子来源于:
 *
 * https://github.com/arnaud-lb/php-rdkafka#examples
 */

$objRdKafka = new RdKafka\Producer();
$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("localhost:9092");

$oObjTopic = $objRdKafka->newTopic("test");

// 从终端接收输入 
$oInputHandler = fopen('php://stdin', 'r');

while (true) {
    echo "\nEnter  messages:\n";
    $sMsg = trim(fgets($oInputHandler));

   // 空消息意味着退出
    if (empty($sMsg)) {
        break;
    }

    /**
     * 发送消息
     *
     * The first argument is the partition. RD_KAFKA_PARTITION_UA stands for unassigned, and lets librdkafka choose the partition.
     * 第一个参数是分区,RD_KAFKA_PARTITION_UA 表示未分配,并且由 librdkafka 选择分区。
     * The second argument are message flags and should be either 0 or RD_KAFKA_MSG_F_BLOCK to block produce on full queue.
     * 第二个参数是消息标志,为 0 或 RD_KAFKA_MSG_F_BLOCK,当队列满了时阻止生产消息。
     * The message payload can be anything.
     * 消息可以是任何内容。
     */
    $oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg);
}

echo "done\n";


        2. 【检验发送是否成功】

        终端开启一个消费者:

        # 因为生产者会往test的topic中发送消息,消费者直接消费test即可

[root@localhost kafka_2.13-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test

生产者端发送:	[root@localhost ~]# php producer.php
Enter messages:
one

Enter messages:
two

消费者端接收:	[root@localhost ~]# kafka-console-consumer --bootstrap-server localhost:9092 --topic test
one
two

        

        3.【消费者】-- 新建 consumer.php

<?php

/**
 * 消费者消费消息
 *
 * 实现的例子来源于:
 *
 * https://github.com/arnaud-lb/php-rdkafka#examples
 */

$objRdKafka = new RdKafka\Consumer();
$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("localhost:9092");

$oObjTopic = $objRdKafka->newTopic("test");

/**
 * consumeStart
 *   第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息
 *   第二个参数标识从什么位置开始拉取消息,可选值为
 *     RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息
 *     RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息
 *     RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样
 */
$oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END);

while (true) {
   // 第一个参数是分区,第二个参数是超时时间
    $oMsg = $oObjTopic->consume(0, 1000);

    // 没拉取到消息时,返回NULL
    if (!$oMsg) {
        usleep(10000);
            continue;
    }

    if ($oMsg->err) {
        echo $msg->errstr(), "\n";
	break;
    } else {
        echo $oMsg->payload, "\n";
    }
}

        4. 【检验】

生产者端发送:	[root@localhost ~]# php producer.php
Enter messages:
123
						
Enter messages:
456
		
消费者端接收:	[root@localhost ~]# php consumer.php
123
456		
	

        

    注意:

           消息是异步的,死循环是等待消息发送成功,相关其他处理方发:https://www.jianshu.com/p/cf74346acf13 

冷暖自知一抹茶ck




相关文档:


        官方文档:http://kafka.apache.org/documentation/#quickstart 

        

        kafka PHP官方扩展文档:https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/book.rdkafka.html 

        kafka 配置项的官方说明: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md 


        安装: https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka.setup.html 

        示例: https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka.examples.html 


        librdkafka库 : https://github.com/edenhill/librdkafka 

        PHP-Kafka客户端 :https://github.com/arnaud-lb/php-rdkafka 

        低级、高级消费模式:https://www.cnblogs.com/wenhainan/p/10932147.html 

冷暖自知一抹茶ck
请先登录后发表评论
  • 最新评论
  • 总共0条评论