Jeffcky 阅读(45) 评论(0)

前言

上一节我们简单介绍了RabbitMQ和在安装后启动所出现的问题,本节我们开始正式进入RabbitMQ的学习,对于基本概念请从官网或者其他前辈博客上查阅,我这里不介绍基础性东西,只会简单提一下,请知悉。

RabbitMQ持久化

在RabbitMQ中存在四种交换机,一是直连交换机(Direct Exchange),二是广播交换机(Fantout Exchange),三是主体交换机(Topic Exchange),四是头交换机(Header Exchange),每种交换机都有其对应场景,后面会一一讲到。任何处理消息的开源库都离不开两个角色,一是生产者(Producer),二是消费者(Consumer),消息从生产者到达消费者需经过三个阶段也就是我们需要知道的三个概念,一是交换机,二是队列,三是绑定,初次见此三者概念感觉很高大上是不是,其实不过是高度抽象了而已,同时看到很多博客上对此三者解释的还是非常深刻,我这里就不过多细说。我们先看别人博客上对整个RabbitMQ架构用图来进行标识,然后对照我对交换机、队列、绑定一言以蔽之进行如下概括您就明白他们具体是干什么了。

交换机是生产者发布消息的入口点,队列是消费者获取消息的容器,绑定是将交换机连接到队列的规则

我们简单过一下消息从生产者到达消费者的大概过程,生产者也就是上述图中的ClientA和ClientB,发送消息到交换机,这里交换机可以是一个或者多个,然后通过路由键绑定到队列,那么消费者如何知道绑定到了哪个队列呢?然后消费者也就是上述Client1、Client2、Client3也要声明交换机、队列、绑定,这样整个过程就串起来了,原理大概就是这样。接下来我们通过代码来实现生产者发送消息(请通过NuGet安装RabbitMQ客户端)。

    public class RabbitMQService
    {
        public IConnection GetRabbitMQConnection()
        {
            var connectionFactory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "guest",
                Password = "guest"
            };
            return connectionFactory.CreateConnection();
        }
    }

第一步当然是需要创建连接,连接到RabbitMQ服务,接下来则是在我们创建连接的基础上创建通道,然后上述我们所说消息从生产者到达消费者的过程就是在此通道上进行。

            var rabbitMQService = new RabbitMQService();
            var connection = rabbitMQService.GetRabbitMQConnection();
            var model = connection.CreateModel();

接下来则是声明交换机、队列、通过路由键将交换机和队列绑定在一起。

        static void InitialTopicQueue(IModel model)
        {
            model.QueueDeclare("queueDeclare", true, false, false, null);
            model.ExchangeDeclare("exchangeDeclare", ExchangeType.Topic);
            model.QueueBind("queueDeclare", "exchangeDeclare", "routeKey");
        }

如上我们声明队列名称为queueDeclare且该队列持久化,然后声明交换机名称为exchangeDeclare,交换机类型为主题,最后通过QueueBind方法通过routekey路由键将声明的交换机和队列绑定在一起。接下来我们开始发布消息。

            var basicProperties = model.CreateBasicProperties();
            basicProperties.DeliveryMode = 1;
            var payload = Encoding.UTF8.GetBytes("这是来自运行VS2017控制台发出的消息");
            var address = new PublicationAddress(ExchangeType.Topic, "exchangeDeclare", "routeKey");
            model.BasicPublish(address, basicProperties, payload);

然后我们在控制台中运行上述程序,然后在RabbitMQ UI上来查看声明的交换机、队列、绑定的路由键以及承载的消息。

 

 

一切如正常运行,其我们发布的消息在队列中处于Ready状态,接下来我们在服务中关闭RabbitMQ服务代理,然后重启模拟宕机的情况。

 

因为在声明队列中第二个参数可指定该队列是否可持久化,我们指定为True即持久化,所以即使RabbitMQ Broker重启队列依然还在,这个时候我们会发现在队列中的数据被扔掉了,也就是说此时的队列为空。那是因为我们指定消息为非持久化,如下指定传输模式即DeliveryModel属性为1,1代表非持久化,2为可持久化。所以如果我们指定DeliveryMode等于2即使RabbitMQ代理宕机,重启后消息依然还在。

            var basicProperties = model.CreateBasicProperties();
            basicProperties.DeliveryMode = 2;

 同理切换到ExchangeType界面时,此时我们声明的交换机不存在了,如下:

因为声明交换机时ExchangeDeclare方法有重载,第三个参数可指定是否持久化,默认为非持久化,如果我们进行如下指定为持久化,那么和在队列中的消息一样即使RabbitMQ代理重启交换机依然还在,这个就不需要我再过多啰嗦了。

 model.ExchangeDeclare("exchangeDeclare", ExchangeType.Topic, true);

 

从如上演示我们可看出在RabbitMQ中关于持久化,可对交换机(ExChange)、队列(Queue)、消息(Message)分别指定持久化,而且在大部分情况下这也是我们想要的。因为这至少可以保证消息从生产者传递消费者的过程中不会丢失。那么对于持久化和非持久化RabbitMQ是如何进行处理的呢?

持久化:将消息保存到磁盘上,因此即使在服务器重新启动后它们仍然可用,只不过在读取和保存消息时会产生一些额外的开销罢了。

非持久化:将消息被保存在内存中,虽然它们在服务器重启后将会消失,但提供更快的消息处理。

总结

今天我们详细讲解了RabbitMQ中的持久化,我们可从交换机、队列、消息三个层面去设置持久化。这只是小试牛刀,下节开始讨论细枝末节,学习的过程也是发现问题的过程,我们下节再会。