RabbitMQ 高级特性——消息分发

news/2024/11/8 4:59:00 标签: rabbitmq, ruby, 分布式

在这里插入图片描述

文章目录

  • 前言
  • 消息分发
    • RabbitMQ 分发机制的应用场景
      • 1. 限流
      • 2. 负载均衡

前言

当 RabbitMQ 的队列绑定了多个消费者的时候,队列会把消息分发给不同的消费者,每条消息只会发送给订阅列表的一个消费者,但是呢,RabbitMQ 默认是以轮询的方式进行分发的,而不会管消费者是否已经消费并且已经确认了消息,这种方式其实是不合理的,因为每个消费者的消费能力是不同的,如果某个消费者的消费能力很低,那么就会导致其他的消费者已经消费完成所有消息了,但是这个消费者还有很多消息需要消费,这样就会导致消息积压。那么该任何解决这个问题呢?就是我们这篇文章需要讲到的消息分发。

消息分发

RabbitMQ 的消息分发机制主要分为两种:1. 轮询分发 2. 非公平分发

  1. 轮询分发
  • 在默认情况下,RabbitMQ采用轮询的方式将队列中的消息分发给消费者。这意味着如果有多个消费者订阅了同一个队列,RabbitMQ会尝试公平地将消息依次分发给每个消费者。
  • 轮询分发机制确保了消息在多个消费者之间的均衡分配,避免了某个消费者过载而其他消费者空闲的情况。
  1. 非公平分发
  • 为了更好地控制消息的分发过程,RabbitMQ提供了非公平分发的机制。在这种机制下,消费者可以通过设置basic.qos方法并指定prefetch_count参数来限制RabbitMQ一次性发送给它的消息数量。
  • 通过调整prefetch_count的值,消费者可以根据自己的处理能力来控制消息的分发速度,从而避免因为处理速度不同而导致的消息堆积或空闲。

RabbitMQ 分发机制的应用场景

消息分发的常见应用场景有两个:

  1. 限流
  2. 负载均衡

1. 限流

每逢双十一或者其他节日的时候,某些购物平台的订单量会激增,这样就会导致单个服务器接收的订单数量超过了能够承受的范围,所以为了保证我们的订单服务器能够正常运行不发生宕机故障,就需要对服务器接收的消息数量做出限制。

那么如何实现限流的功能呢?我们通过设置 prefetchCount 参数并且设置确认方式为手动确认,prefetchCount 就是控制消费者从队列中预取消息的数量,以此来实现限流和负载均衡。通过设置这个配置,就可以保证消费者中最多只能存在 prefetchCount 个未确认的消息。

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual # 设置确认方式为手动确认
        prefetch: 5 # 限制消费者只能接收5条消息
public static final String QOS_EXCHANGE = "qos.exchange";
public static final String QOS_QUEUE = "qos.queue";

声明交换机、队列和绑定关系:

@Configuration
public class QosConfig {
    @Bean("qosExchange")
    public Exchange qosExchange() {
        return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build();
    }

    @Bean("qosQueue")
    public Queue qosQueue() {
        return QueueBuilder.durable(Constants.QOS_QUEUE).build();
    }

    @Bean("qosBinding")
    public Binding qosBinding(@Qualifier("qosExchange") Exchange exchange, @Qualifier("qosQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();
    }
}

生产者:

@RequestMapping("/qos")
public String qos() {
    for (int i = 0; i < 20; i++) {
        rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE,"qos","rabbitmq qos" + i);
    }
    return "消息发送成功";
}

消费者:

@Component
public class QosListener {
    @RabbitListener(queues = Constants.QOS_QUEUE)
    public void listener1(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("消费者1接收到消息:" + new String(message.getBody()) + ".deliveryTag:" + deliveryTag);
        //channel.basicAck(deliveryTag,false); 手动确认消息,我们这里不确认,看看在没有确认的情况下,队列会向消费者投递多少条消息
    }

    @RabbitListener(queues = Constants.QOS_QUEUE)
    public void listener2(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("消费者2接收到消息:" + new String(message.getBody()) + ".deliveryTag:" + deliveryTag);
        //channel.basicAck(deliveryTag,false); 手动确认消息,我们这里不确认,看看在没有确认的情况下,队列会向消费者投递多少条消息
    }
}

在这里插入图片描述

在这里插入图片描述

可以看到通过设置 prefetchCount 可以实现限流的效果,而如果我们讲这个配置给注掉的话,那么队列中的消息会全部打给消费者:

在这里插入图片描述

2. 负载均衡

因为每个服务器的处理业务能力不同,有的服务器处理业务速度很快,而有的服务器处理业务的速度则很慢,如果按照轮询的方式分发消息的话,就会出现某些服务器很忙,有些服务器处理完成业务之后很闲的情况,对于这种情况,我们可以通过设置 prefetchCount 的值为 1 来实现负载均衡。

只有当消费者处理完成消息并且手动确认之后,队列才会继续向其发送下一条消息。

我们修改 prefetchCount 的值为 1,然后其他的代码不需要修改,只是通过 Thread.sleep() 方法来模拟出消费者消费速度不同的情况:

@Component
public class QosListener {
    @RabbitListener(queues = Constants.QOS_QUEUE)
    public void listener1(Message message, Channel channel) throws IOException, InterruptedException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        Thread.sleep(1000);
        System.out.println("消费者1接收到消息:" + new String(message.getBody()) + ".deliveryTag:" + deliveryTag);
        channel.basicAck(deliveryTag,false); 
    }

    @RabbitListener(queues = Constants.QOS_QUEUE)
    public void listener2(Message message, Channel channel) throws IOException, InterruptedException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        Thread.sleep(2000);
        System.out.println("消费者2接收到消息:" + new String(message.getBody()) + ".deliveryTag:" + deliveryTag);
        channel.basicAck(deliveryTag,false); 
    }
}

在这里插入图片描述
通过设置 prefetchCount 的值为 1,就可以实现出负载均衡的效果。


http://www.niftyadmin.cn/n/5743252.html

相关文章

Eslint 和 Prettier

提示&#xff1a;ESLint 和 Prettier 是两个常用的工具&#xff0c;它们在 JavaScript 生态系统中扮演着重要角色&#xff0c;但它们的功能和目的有所不同。 一、ESLint是什么&#xff1f; 1.目的&#xff1a; ESLint 是一个静态代码分析工具&#xff0c;主要用于查找和修复 …

WPF中的INotifyPropertyChanged接口

INotifyPropertyChanged 是一个在 WPF (Windows Presentation Foundation) 和 .NET 中使用的接口&#xff0c;它用于实现数据绑定时的数据更新通知。当实现了 INotifyPropertyChanged 接口的类的属性值发生变化时&#xff0c;这个接口允许对象通知绑定到该对象属性的 UI 元素&a…

【华为机试题】光伏场地建设规划 [Python]

题目 代码 class Solution:def func(self, input_args, area_list):count 0for i in range(input_args[0] - input_args[2] 1):for j in range(input_args[1] - input_args[2] 1):count 1 if self.area_compute(area_list,i,j,input_args[2],input_args[3]) else 0print(c…

Mac解决 zsh: command not found: ll

Mac解决 zsh: command not found: ll 文章目录 Mac解决 zsh: command not found: ll解决方法 解决方法 1.打开bash_profile 配置文件vim ~/.bash_profile2.在文件中添加配置&#xff1a;alias llls -alF键盘按下 I 键进入编辑模式3. alias llls -alF添加完配置后&#xff0c;按…

数据仓库之 Atlas 血缘分析:揭示数据流奥秘

Atlas血缘分析在数据仓库中的实战案例 在数据仓库领域&#xff0c;数据血缘分析是一个重要的环节。血缘分析通过确定数据源之间的关系&#xff0c;以及数据在处理过程中的变化&#xff0c;帮助我们更好地理解数据生成的过程&#xff0c;提高数据的可靠性和准确性。在这篇文章中…

在vscode中开发运行uni-app项目

确保电脑已经安装配置好了node、vue等相关环境依赖 进行项目的创建 vue create -p dcloudio/uni-preset-vue 项目名 vue create -p dcloudio/uni-preset-vue uni-app 选择模版 这里选择【默认模版】 项目创建成功后在vscode中打开 第一次打开项目 pages.json 文件会报错&a…

Python学习大纲总结及注意事项

1. Python基础 • Python基础语法&#xff1a;变量、常量、数据类型&#xff08;字符串、整数、浮点数、布尔值&#xff09;、运算符、表达式。• 控制结构&#xff1a;条件语句&#xff08;if-elif-else&#xff09;、循环语句&#xff08;for循环和while循环&#xff09;、循…

恢复rm -rf删除的数据

注&#xff1a;本文演示的是ext4文件系统格式数据恢复 系统版本&#xff1a;ubuntu16.04 恢复数据目录&#xff1a;数据盘&#xff08;非根&#xff09;目录 恢复工具&#xff1a;extundelete 0.2.4 恢复所有被删除数据 ext4magic 恢复指定目录数据 一、注意事项&#xff1a; …