php连接RabbitMQ消息队列

        RabbitMQ是由Erlang语言编写的实现了高级消息队列协议(AMQP)的开源消息代理软件(也可称为 面向消息的中间件)。支持Windows、Linux/Unix、MAC OS X操作系统和包括JAVA在内的多种编程语言。


        AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受 客户端/中间件 不同产品,不同的开发语言等条件的限制。


        Rabbitmq 是一个功能很强大消息队列系统,使用起来可能不像某些 push 、pop 类型的队列简单(比如 redis 的list),Rabbitmq 支持消息的订阅发布模式,方便大型系统各个服务组件之间解耦和通信。我们首先要了解一些基本概念。


RabbitMQ中的重要概念:        

Connection:即连接, 与你日常理解的连接没有什么不同,比如 redis的连接,mysql的连接
Channel:即通道, 可以理解为一个连接中的子通道,想象一条高速公路,可能是 4车道的,也可能是 8 车道的,这些车道就是 Channel。
Exchange : 交换机,如果你知道交换机设备的话,它的原理跟交换机是基本一样的。
routingkey : 用来绑定交换机和队列的一个字符串。发送消息时需要指定 routingKey, 绑定了此 routingKey的队列将接收到此消息。
queue队列: 消息经过交换机,最终发送到队列中。

        

        冷暖自知一抹茶ck

        (1)Broker:经纪人。提供一种传输服务,维护一条从生产者到消费者的传输线路,保证消息数据能按照指定的方式传输。粗略的可以将图中的RabbitMQ Server当作Broker。

        (2)Exchange:消息交换机。指定消息按照什么规则路由到哪个队列Queue。

        (3)Queue:消息队列。消息的载体,每条消息都会被投送到一个或多个队列中。

        (4)Binding:绑定。作用就是将Exchange和Queue按照某种路由规则绑定起来。

        (5)RoutingKey:路由关键字。Exchange根据RoutingKey进行消息投递。

        (6)Vhost:虚拟主机。一个Broker可以有多个虚拟主机,用作不同用户的权限分离。一个虚拟主机持有一组Exchange、Queue和Binding。

        (7)Producer:消息生产者。主要将消息投递到对应的Exchange上面。一般是独立的程序。

        (8)Consumer:消息消费者。消息的接收者,一般是独立的程序。

        (9)Channel:消息通道,也称信道。在客户端的每个连接里可以建立多个Channel,每个Channel代表一个会话任务。


RabbitMQ的使用流程

        AMQP模型中,消息在producer中产生,发送到MQ的exchange上,exchange根据配置的路由方式投递到相应的Queue上,Queue又将消息发送给已经在此Queue上注册的consumer,消息从queue到consumer有push和pull两种方式。

        消息队列的使用过程大概如下: 

    (1)客户端连接到消息队列服务器,打开一个channel。 

    (2)客户端声明一个exchange,并设置相关属性。 

    (3)客户端声明一个queue,并设置相关属性。 

    (4)客户端使用routing key,在exchange和queue之间建立好Binding关系。 

    (5)生产者客户端投递消息到exchange。 

    (6)exchange接收到消息后,就根据消息的RoutingKey和已经设置的binding,进行消息路由(投递),将消息投递到一个或多个队列里。 

    (7)消费者客户端从对应的队列中获取并处理消息。

        

工作过程:

        生产者客户端:

  1.         客户端连接到RabbitMQ服务器上,打开一个消息通道(channel); 

  2.         客户端声明一个消息交换机(exchange),并设置相关属性。 

  3.         客户端声明一个消息队列(queue),并设置相关属性。 

  4.         客户端使用routing key在消息交换机(exchange)和消息队列(queue)中建立好绑定关系。 

  5.         客户端投递消息都消息交换机(exchange)上 客户端关闭消息通道(channel)以及和服务器的连接。

        服务器端:

        exchange接收到消息后,根据消息的key(这个key的产生规则暂时没研究,有知道的小伙伴可以留言告诉我)和以及设置的binding,进行消息路由,将消息投递到一个或多个消息队列中。

        关于exchange也有几个类型:

        (1). Direct交换机:完全根据key进行投递。例如,绑定时设置了routing key为abc,客户端提交信息提交信息时只有设置了key为abc的才会投递到队列; 

        (2).Topic交换机:在key进行模式匹配后进行投递。例如:符号”#”匹配一个或多个字符,符号”*”匹配一串连续的字母字符,例如”abc.#”可以匹配”abc.def.ghi”,而”abc.*”只可以匹配”abc.def”。 

        (3).Fanout交换机:它采取广播模式,消息进来时,将会被投递到与改交换机绑定的所有队列中。


RabbitMQ的消息持久化

        RabbitMQ支持数据持久化,也就是把数据写在磁盘上,可以增加数据的安全性。消息队列持久化包括三个部分:

        1.消息交换机(exchange)持久化,在声明时指定durable为1

        2.消息队列(queue)持久化,在声明时指定durable为1

        3.消息持久化,在投递时指定delivery_mode为2(1是非持久化)


        如果消息交换机(exchange)和消息队列(queue)都是持久化的话,那么他们之间的绑定(Binding)也是持久化的。如果消息交换机和消息队列之间一个持久化、一个非持久化,那么就不允许绑定。


RabbitMQ的优缺点

适用场景:

        比较适合异步传输,这里解释一下什么是异步和同步。

        异步:发送方不关心消息有没有发送成功,只发送消息,不去获取消息是否发送成功。

        同步:发送方关心消息是否发送成功,发送消息后,会等待接收方返回状态码,根据状态码来判断是否发送成功,然后执行相对于的动作。

        下边以Http中的同步和异步为例: 

        如:普通的B/S架构客户端和服务器端之间的通信就是同步的,即提交请求 ---> 等待服务器处理完毕返回消息 ---> 拿到服务器返回的消息,处理完毕。 

        如:Ajax技术就是异步的,请求通过事件触发 ---> 服务器处理(浏览器不用等待,仍可以做其他的事情) ---> 处理完毕。 有人可能会好奇说应用场景怎么说到了同步和异步,那说明你还不是很理解技术和应用场景之间的紧密联系。


优点:

    (1)由Erlang语言开发,支持大量协议:AMQP、XMPP、SMTP、STOMP。

    (2)支持消息的持久化、负载均衡和集群,且集群易扩展。

    (3)具有一个Web监控界面,易于管理。

    (4)安装部署简单,上手容易,功能丰富,强大的社区支持。

    (5)支持消息确认机制、灵活的消息分发机制。

缺点:

    (1)由于牺牲了部分性能来换取稳定性,比如消息的持久化功能,使得RabbitMQ在大吞吐量性能方面不及Kafka和ZeroMQ。

    (2)由于支持多种协议,使RabbitMQ非常重量级,比较适合企业级开发。


        因此当需要一个稳定的、高可靠性的、功能强大且易于管理的消息队列可以选择RabbitMQ。如果对消息吞吐量需求较大,且不在乎消息偶尔丢失的情况可以使用Kafka。




    发布消息:

    新建文件send.php

<?php
$conn = [
    // Rabbitmq 服务地址
    'host' => '127.0.0.1',
    // Rabbitmq 服务端口
    'port' => '5672',
    // Rabbitmq 帐号
    'login' => 'guest',
    // Rabbitmq 密码
    'password' => 'guest',
];

//创建连接和channel
$conn = new AMQPConnection($conn);
if(!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

// 用来绑定交换机和队列
$routingKey = 'key_1';

$ex = new AMQPExchange($channel);
//  交换机名称
$exchangeName = 'ex1';
$ex->setName($exchangeName);

// 设置交换机类型
$ex->setType(AMQP_EX_TYPE_DIRECT);
// 设置交换机是否持久化消息
$ex->setFlags(AMQP_DURABLE);

for($i=0; $i<5; ++$i){
    echo "Send Message:".$ex->publish(date('H:i:s')."用户".$i."注册" , $routingKey )."\n";
}

    消费消息:

    新建文件receive.php

<?php
$conn = [
    // Rabbitmq 服务地址
    'host' => '127.0.0.1',
    // Rabbitmq 服务端口
    'port' => '5672',
    // Rabbitmq 帐号
    'login' => 'guest',
    // Rabbitmq 密码
    'password' => 'guest',
];


//创建连接和channel
$conn = new AMQPConnection($conn);
if(!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);
$exchangeName = 'ex1';

//创建交换机
$ex = new AMQPExchange($channel);
$ex->setName($exchangeName);

$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$ex->setFlags(AMQP_DURABLE); //持久化

//  创建队列
$queueName = 'queue1';
$q = new AMQPQueue($channel);
$q->setName($queueName);
$q->setFlags(AMQP_DURABLE);
$q->declareQueue();

// 用于绑定队列和交换机,跟 send.php 中的一致。
$routingKey = 'key_1';
$q->bind($exchangeName,  $routingKey);

//接收消息
$q->consume(function ($envelope, $queue) {
    $msg = $envelope->getBody();
    echo $msg."\n"; //处理消息
}, AMQP_AUTOACK);

$conn->disconnect();


    运行代码:

    启动rabbitmq服务。

    一开始队列是不存在的,我们需要先启动 consume.php 来初始化队列,打开命令行页面,运行

php receive.php

    启动后 , php receive.php 将阻塞监听队列消息。

    然后打开新的命令行窗口运行:

php send.php

    然后在新 receive.php 所在的终端窗口将看到接收到的消息。

冷暖自知一抹茶ck



冷暖自知一抹茶ck
请先登录后发表评论
  • 最新评论
  • 总共0条评论