RabbitMQ

37

介绍

MQ意为消息队列或消息中间件,通过生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断从队列中获取消息,消息的生产和消费都是异步的,只关心消息的发送和接收,没有业务逻辑的侵入,实现了多个系统之间调用的解耦,通过高效的消息传递机制和平台无关的数据交流,基于数据通信来进行分布式系统集成

RabbitMQ

使用Erlang语言开发的消息队列,基于AMQP协议实现,AMQP主要特征面向消息、队列、路由(包括点对点和发布订阅)、可靠、安全。性能在MQ中一般但对数据的一致性稳定性是所有MQ中要求最高的。可以与Spring/SpringBoot无缝整合

应用场景

在用户注册的业务需求中,传统的做法有两种

  • 串行方式 将注册信息写入数据库后,再发送注册邮件和注册短信以上三个任务全部完成后才返回给客户端。但是邮件和短信并不是必须的只是一个通知而这种做法会让客户端等待响应

  • 并行方式 将注册信息写入数据库后发送邮件的同时发送短信,以上三个任务完成后才返回给客户端,并行方式能一定程度上提高处理时间,但是还是无法解决串行方式的问题

引入消息队列

  • 消息队列 将发送短信和邮件等非必须的业务逻辑抽离开来,进行异步处理

使用了消息队列的广播模型后,用户的相应时间就几乎等于写入数据库的时间,写入消息队列的时间可以忽略不计

并且消息队列能保证消息的可靠投递,进行合理的配置就算一个服务挂掉也不会导致消息丢失

在一些高并发场景下,因为流量过大会导致程序宕机,引入消息队列后,可以控制一定流量,更多的连接会被直接丢弃,缓解短时间高流量压垮程序

AMQP协议

Advanced message queuing protocol用来解决金融领域不同平台之间的消息传递交互问题,AMQP不是从API进行限定的,而是直接定义网络交换的数据格式,这便使得AMQP的天然的跨平台性

amqp

生产者连接交换机需要交换机名

交换机连接队列需要路由Key,如果生产者直接与队列连接也需要路由Key

消费者绑定队列需要队列名

发布订阅这种特殊模式路由Key无意义

使用

RabbitMQ官网

RabbitMQ是基于erlang,socat是erlang的内存管理程序

# 下载并安装
yum install -y erlang
yum install -y rabbitmq-server
yum install -y socat

配置

安装后配置文件的模板在以下目录中

请通过find命令找到rabbitmq-server的文件夹名

# 移动到指定配置目录下,配置文件损坏可以重复此操作
cd /usr/share/doc/rabbitmq-server/rabbitmq.config.example
cp ./rabbitmq.config.example /etc/rabbitmq/rabbitmq.config

修改配置,开启来宾用户访问权限

vim rabbitmq.config
# 大约在60行左右,删除注释和逗号
# %% {loopback_users, [<<"guest">>]},
 {loopback_users, [<<"guest">>]}

插件

# 查看插件开启情况
rabbitmq-plugins list
# 开启mq的web管理控制插件
rabbitmq-plugins enable rabbitmq_management

启动

ip:15672访问

处于安全的考虑,guest这个默认用户在一些版本中只能通过localhost来登录

# 启动
systemctl start rabbitmq-server
# 后台启动
rabbitmq-server -detached
# 查看用户列表
rabbitmqctl list_users
# 创建新用户
rabbitmqctl add_user [username] [password]
# 赋予权限
rabbitmqctl set_user_tags [username] administrator
rabbitmqctl set_permissions -p / [username] ".*" ".*" ".*"

基于命令行

一般情况下使用可视化界面而非命令行

# 查看rabbitmq的命令帮助文档
rabbitmqctl help
# 设置内存绝对值
rabbitmqctl set_vm_memory_high_watermark absolute 50MB
# 设置硬盘剩余容量预警
rabbitmqctl set_disk_free_limit 100GB

修改配置文件

# 内存换页
# 当内存达到极限之前将数据转移到硬盘中
vm_memory_high_watermark_paging_ratio = 0.7

使用

web页面概述

Overview概述

Connections连接情况

Channel通道

Exchange交换机

Queues队列

Admin用户

MQ配置

创建虚拟主机

构建虚拟主机

创建新用户,设置为管理权限

创建新用户

配置用户,赋予权限访问虚拟主机,所有权限都选择默认

配置用户

配置用户2

引入依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.2</version>
</dependency>

Channel API参考

队列声明

queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments)
  • queue 指向一个通道,如果没有则创建

  • durable 用来定义队列特性是否需要持久化(如果设置为true,mq关闭前会将队列进行持久化,不过队列中的消息还是会丢失)

  • exclusive 是否独占队列(如果设置为true则当前队列只能在当前通道所绑定,其它的连接使用该队列会抛出异常)

  • autoDelete 消费者与队列断开连接后自动删除该队列

  • arguments 额外参数配置

发布消息

basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
  • exchange 交换机名称(空格字符串意味使用rabbitMQ的默认交换机,或者可以理解为不使用交换机)

  • routingKey 路由key (不使用可以输入空字符串)

  • props 额外设置

MessageProperties.PERSISTENT_TEXT_PLAIN 队列关闭以后会对队列中的消息进行持久化

  • body 传递的消息

消费消息

basicConsume(String queue, boolean autoAck, Consumer callback)
  • queue 消费队列的名称

  • autoAck 开始消息自动确认机制

  • callback 消费时的回调类

消息确认

basicAck(long deliveryTag, boolean multiple)
  • deliveryTag 确认队列中是哪一个具体的消息

可以通过handleDelivery方法中的的envelope.getDeliveryTag拿到

  • multiple 是否开启多个消息同时确认(如果设置为false将改为手动确认)

交换机声明

exchangeDeclare(String exchange, String type)
  • exchange 交换机名称

  • type 交换机类型

fanout 广播模式

direct 路由模式

交换机与队列绑定

queueBind(String queue, String exchange, String routingKey)
  • queue 队列名称

  • exchange 交换机名

  • routingKey 路由key

Hello World 简单的点对点

注意不要使用任何测试框架测试mq的任何代码,因为mq涉及异步和多线程 !

官方文档

生产者

生产者源源不断地生产消息进入队列中,消费者不断的从队列中取出消息进行消费

这种属于直连

直连

public static void main(String[] args) throws IOException, TimeoutException {
    // 创建MQ的连接对象工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 配置工厂
    factory.setHost("[IP地址]");
    factory.setPort(5672); // 默认端口号为5672
    factory.setVirtualHost("/ems");
    // 上面截图中的配置,访问/ems虚拟主机的用户名和密码
    factory.setUsername("ems");
    factory.setPassword("123");
    // 工厂构建连接对象
    Connection connection = factory.newConnection();
    // 获取连接中的通道
    Channel channel = connection.createChannel();
    // 需要将通道与消息队列对应上,如果不存在此队列名rabbitMQ会帮助创建
    channel.queueDeclare("hello", false, false, false, null);
    // 发布消息
    channel.basicPublish("", "hello", null, "Hello,rabbitMQ!".getBytes());
    // 关闭
    channel.close();
    connection.close();
}

运行后可以看到队列中新增的记录(只有配置了/ems的用户才能看到)

查看管道

消费者

运行后控制台打印Hello,rabbitMQ!

使用DefaultConsumer,收到消息会调用内部的handleDelivery,默认是空方法所以需要重写

public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("[IP地址]");
    factory.setVirtualHost("/ems");
    factory.setUsername("ems");
    factory.setPassword("123");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    // 生产者和消费者的对队列的定义需要完全一致
    channel.queueDeclare("hello", false, false, false, null);
    // 消费消息
    channel.basicConsume("hello", true, new DefaultConsumer(channel) {
        @Override
        // 收到消息时调用
        // body从消息队列中取出的消息
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
            System.out.println(new String(body));
        }
    });
    // 消费者调用close将导致程序终止
}

运行后消费者会不断从消息队列中取出消息并消费

消费消息

工作队列 Work queues

多个消费者从消息队列中消费消息

任务模型

生产消息的速度远远大于消费速度,长此以往消息就会堆积的越来越多无法及时处理

让多个消费者绑定到一个队列共同消费队列中的消息,队列中的消息一旦被消费就会消失因此任务不会被重复执行

工作队列与Hello World的代码一致,只是同时运行了多个消费者

生产者

// 冗余代码省略,可以将他们封装到工具类中或者放入spring的组件中等
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
for (int i = 0; i < 100; i++) {
    channel.basicPublish("", "work", null, "Hello,Work queue!".getBytes());
}
// 关闭省略

消费者

运行后可以发现多个消费者消费的消息数量都是一致的,rabbitMQ的工作队列使用了轮询

缺点是如果一个消费者运行速度较慢,会降低整个系统运行速度

// 冗余代码省略,可以将他们封装到工具类中或者放入Spring的组件中等
// 消费者2,3等代码省略
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
        System.out.println("消费者1");
        System.out.println(new String(body));
    }
});

消息确认机制

解决工作队列的缺点,设置每次只能允许执行一个消息,否则rabbitMQ会一次性将所有的消息直接平均分配给每一个消费者(如果出现宕机,消息将会丢失)

channel.basicQos(1);

然后关闭自动确认改为每次接受完手动确认

// autoAck设置为false
channel.basicConsume("work", true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("消费者1");
        System.out.println(new String(body));
        // 手动确认消息接受完毕,只要没被确认消息就会保留在消息的Unacked中
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});

执行的快的先确认,这样就实现了能者多劳、按需分配的效果

发布订阅Publish/Subscribe

使用了fanout模型,也称为广播

广播

可以有多个消费者,每个消费者都有自己的队列,每个队列都需要绑定交换机

生产者发送消息只能发送到交换机,交换机来决定发送给哪个队列,生产者无法决定

交换机把消息发送给绑定过的所有队列,队列的消费者都能拿到消息,实现一条消息被多个消费者消费(所以无需路由Key)

创建交换机

Channel channel = connection.createChannel();
// 声明交换机 fanout为广播模型的交换机
channel.exchangeDeclare("mengnan", " fanout");
// 发送消息给交换机
channel.basicPublish("mengnan", "", null, "Hello Fanout!".getBytes());

交换机

创建临时队列

生产者发布一条消息,所有消费者都能消费这一条消息

Channel channel = connection.createChannel();
// 绑定(channel.exchangeBind)或声明交换机
channel.exchangeDeclare("mengnan", "fanout");
// 创建临时队列并获取它的名称
String queue = channel.queueDeclare().getQueue();
// 将交换机和队列绑定,fanout中的路由Key没有效果
channel.queueBind(queue, "mengnan", "");
// 消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
        System.out.println("消费者1");
        System.out.println(new String(body));
    }
});

路由 Routing

在发布订阅的基础上增加一个特性,不同的消息能被指定的多个队列消费

使用了Direct模型,也称为直连

订阅

队列与交换机绑定,需要通过路由Key

消息的发送发在交换机发送消息时还需要指定路由Key

交换机不在把消息交给每一个队列而是通过路由Key进行判断,只有队列与消息的路由Key一致时才会接受该消息

生产者

Channel channel = connection.createChannel();
// 交换机类型为direct
channel.exchangeDeclare("direct", "direct");
// 关键路由key
String routingKey = "info";
// 发送消息
channel.basicPublish("direct", routingKey, null, ("Hello,direct_" + routingKey + "!").getBytes());

查看路由

消费者1

Channel channel = connection.createChannel();
// direct类型路由
channel.exchangeDeclare("direct", "direct");
// 临时队列
String queue = channel.queueDeclare().getQueue();
// 基于路由Key绑定队列和交换机,之后只会取指定路由Key的消息进行消费
channel.queueBind(queue, "direct", "error");
// 获得消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
        System.out.println("错误消息" + new String(body));
    }
});

消费者2

// 获取通道、创建direct类型路由、创建临时队列
// 同时候绑定多个
channel.queueBind(queue, "direct", "info");
channel.queueBind(queue, "direct", "error");
channel.queueBind(queue, "direct", "warning");
// 获得消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
        System.out.println("所有日志消息" + new String(body));
    }
});

当生产者的路由key为error时消费者1和消费者2都能接收到消息,info和warning只有消费者2能接收到

订阅 Topics

动态路由

普通路由如果要进行逻辑判断或者大量的队列通过queueBind绑定,这样可能导致代码冗余

与路由的区别就是在绑定路由Key的时候可以使用通配符

这种模型的路由Key都是由多个单词组合而成,多个单词以”.“分割

订阅

通配符匹配规则

  • * 匹配一个单词

  • # 匹配零或多个单词

生产者

Channel channel = connection.createChannel();
// 交换机类型为 topic
channel.exchangeDeclare("topic", "topic");
String routeKey = "a.b.orange";
channel.basicPublish("topic", routeKey, null, "Hello,topic!".getBytes());

消费者

Channel channel = connection.createChannel();
channel.exchangeDeclare("topic", "topic");
// 临时队列
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue, "topic", "*.*.orange");
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
        System.out.println(new String(body));
    }
});

与SpringBoot整合

Springboot天生支持AMQP协议,并且对rabbitmq进行了再封装

引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置

spring:
  rabbitmq:
    host: [ip地址]
    port: 5672
    username: ems
    password: 123
    virtual-host: /ems

Springboot提供了RabbitTemplate用来就简化操作,可以在项目中直接注入使用

@Autowired
private RabbitTemplate rabbitTemplate;

Hello world

生产者

// 第一个参数是路由Key,与管道名对应
rabbitTemplate.convertAndSend("hello", "hello world!");

springboot_helloworld

消费者

默认非独占,持久化,非自动删除的队列

可能需要先手动在Web上创建该队列名

@Component
// 声明这是消费(监听)者
// 声明队列为hello,declare是否持久化,autoDelete是否自动删除(注意接收到的不是布尔值而是字符串)
@RabbitListener(queuesToDeclare = @Queue(value = "hello",
        declare = "false",
        autoDelete = "true"))
public class HelloCustomer {
    // 接收到消息的Handler
    @RabbitHandler
    public void receive(String msg) {
        System.out.println(msg);
    }
}

工作队列

生产者代码将路由key改为对应

@Component
public class WorkCustomer {
    @RabbitListener(queuesToDeclare = @Queue(value = "work"))
    public void receive1(String msg) {
        System.out.println("msg" + msg);
    }
    // 多个监听者略
}

公平分发

只有处理完消息后,才会收到下一个消息

# 增加listener.simple.prefetch为1
spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1

发布订阅

生产者

这里的路由Key无意义,所以使用空字符串

// 第一个参数为交换机名
rabbitTemplate.convertAndSend("fanout", "", "Hello fanout!");

消费者

@Component
public class FanoutCustomer {
    @RabbitListener(bindings = {
            @QueueBinding( // 绑定队列
                    value = @Queue,  // 不指定名称为临时队列
                    exchange = @Exchange(value = "fanout", type = "fanout") // 路由
            )
    })
    public void receive1(String msg) {
        System.out.println(msg);
    }
    // 多个监听者略
}

路由

生产者

rabbitTemplate.convertAndSend("directs", "info", "Hello directs!");

消费者

@Component
public class RouteCustomer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(value = "directs", type = "direct"), // direct就是默认值
                    key = {"info", "error", "warn"} // 路由Key
            )
    })
    public void receive1(String msg) {
        System.out.println(msg);
    }
    // 多个监听者略
}

订阅

生产者

rabbitTemplate.convertAndSend("topics", "user.save", "Hello topics!");

消费者

@Component
public class TopicCustomer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(value = "topics", type = "topic"),
                    key = {"user.*"}
            )
    })
    public void receive1(String msg) {
        System.out.println(msg);
    }
    // 多个监听者略
}

声明

以上需要在Web界面中手动创建队列、交换机等

以下代码需要放在消费者端配合@RabbitListener,SpringBoot可以帮助创建队列和交换机

@Configuration
public class FanoutCustomer {
    // 声明交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout_order_exchange", true, false);
    }

    // 声明队列
    @Bean
    public Queue smsQueue() {
        return new Queue("sms.fanout.queue", true);
    }
    
    // 绑定
    @Bean
    public Binding smsBinding() {
        // 队列与交换机绑定
        // 如果交换机类型不是广播模型还可以指定路由Key
        // return BindingBuilder.bind(duanxinQueue()).to(directExchange()).with("apple");
        return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }
}

过期时间

为队列配置

当超过配置的时间后,消息会从消息队列中移除,可以通过配置放入死信队列

并且队列会标注为TTL

@Configuration
public class TTLRabbitMQConfig {
    @Bean
    public DirectExchange ttldirectExchange() {
        return new DirectExchange("ttl_direct_exchange", true, false);
    }

    @Bean
    public Queue directQueue() {
        Map<String, Object> args = new HashMap<>();
        // 配置过期时间
        args.put("x-message-ttl", 5000);
        return new Queue("ttl.direct.exchange", true, false, false, args);
    }

    @Bean
    public Binding ttlBinds() {
        return BindingBuilder.bind(directQueue()).to(ttldirectExchange()).with("ttl");
    }
}

为队列中的消息单独配置

当超过配置的时间后,消息会从消息队列中直接移除

@Configuration
public class TTLRabbitMQConfig {
    @Bean
    public DirectExchange ttldirectExchange() {
        return new DirectExchange("ttl_direct_exchange", true, false);
    }
    
    @Bean
    public Queue directttlMessageQueue() {
        // 就是一个普通队列
        return new Queue("ttl.direct.exchange", true, false, false);
    }

    @Bean
    public Binding ttlBinds() {
        return BindingBuilder.bind(directQueue()).to(directttlMessageQueue()).with("ttlMessage");
    }

}

为每个消息指定

public void makeOrderTtlMessage(String userId, String productId, int num) {
    String orderId = UUID.randomUUID().toString();
    System.out.println("订单生成成功 =》" + orderId);
    String exchangeName = "ttl_direct_exchange";
    String routingKey = "ttlMessage";
    MessagePostProcessor processor = message -> {
        message.getMessageProperties().setExpiration("50000");
        message.getMessageProperties().setContentEncoding("UTF-8");
        return message;
    };
    rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId, processor);
}

死信队列

Dead-Leter-Exchange(DLX),死信交换机

当消息在一个队列中变成死信之后会被重新发送到另一个交换机中,这一个交换机就是DLX,绑定了DLX的队列就死新队列

有以下原因,消息被变成死信

  • 消息被拒绝

  • 消息过期

  • 队列达到最大长度

创建死信队列

@Configuration
public class DeadRabbiMQConfig {

    @Bean
    public DirectExchange deadDirect() {
        return new DirectExchange("dead_direct_exchange", true, false);
    }

    @Bean
    public Queue deadQueue() {
        return new Queue("dead.direct.queue", true);
    }

    @Bean
    public Binding deadbinds() {
        return BindingBuilder.bind(deadQueue()).to(deadDirect()).with("dead");
    }
}

与其它队列建立联系

当该交换机接收到的消息在5秒内没有处理将会发送给死信队列

x-max-length 该队列最多容纳数

x-dead-letter-exchange 指定死信队列交换机

x-dead-letter-routing-key 指定路由key

x-message-ttl 超时丢弃

@Configuration
public class TTLRabbitMQConfig {
    @Bean
    public DirectExchange ttldirectExchange() {
        return new DirectExchange("ttl_direct_exchange", true, false);
    }

    @Bean
    public Queue directQueue() {
        Map<String, Object> args = new HashMap<>();
        // 指定死信队列交换机
        args.put("x-dead-letter-exchange", "dead_direct_exchange");
        // 指定路由key(fanout模式无需配置)
        args.put("x-dead-letter-routing-key", "dead");
        // 超时放入死信队列
        args.put("x-message-ttl", 5000);
        return new Queue("ttl.direct.exchange", true, false, false, args);
    }

    @Bean
    public Binding ttlBinds() {
        return BindingBuilder.bind(directQueue()).to(ttldirectExchange()).with("ttl");
    }
}

集群

Erlang语言天撒具备分布式特性,RabbitMQ天生支持Clustering,这使得它不需要向ActiveMQ、Kafka需要Zookeeper分别来实现HA方案和保存集群元数据。集群是保证可靠性的一种方式、同时可以通过水平扩展达到增加消息吞吐量能力的目的

官方文档

副本集群

是一种主备集群

主节点内的所有数据都可以复制和访问到从节点上,但是消息队列无法复制,消息队列只存在与主节点上

主节点才能和生产者获取消息,丛节点只是为了同步主节点内的交换机的数据,无法同步队列

只有一个主节点,如果主节点宕机,从节点只能做到备份交换机的作用,不会切换为主节点,并且主节点内的队列中的消息会丢失(可以通过允许持久化规避)

在主节点没有宕机的情况下才能为主节点分担压力

主从复制集群

当集群内的主节点宕机,可以对队列内的信息进行备份

部署

官方文档

可以直接使用management版本的镜像,或者手动开启插件

如果登陆时候出现ReferenceError: disable_stats is not defined 清理浏览器缓存

构建集群需要同步erlang cookie

另外节点之间需要互相建立连接连接

# 这是主节点
docker run -d \
    --hostname rabbit \
    --name rabbitMaster \
    -p 5672:5672 -p 15672:15672  \
    -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password \
    -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' \
    rabbitmq:management

# 这是第一个从节点
docker run -d \
	--hostname rabbit2 \
	--name rabbitSlaver \
	-p 5673:5672 -p 15673:15672 \
	-e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password \
	-e RABBITMQ_ERLANG_COOKIE='rabbitcookie' \
	--link rabbitMaster:rabbit \
	rabbitmq:management

# 这是第二个从节点
docker run -d \
	--hostname rabbit3 \
	--name rabbitSlaver2 \
	-p 5674:5672 -p 15674:15672 \
	-e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password \
	-e RABBITMQ_ERLANG_COOKIE='rabbitcookie' \
	--link rabbitMaster:rabbit \
	--link rabbitSlaver:rabbit2 \
	rabbitmq:management

进入容器后查看集群状态

rabbitmqctl cluster_status

从节点停止

rabbitmqctl stop_app

从节加入集群

# 清除该节点的历史数据(否则可能无法加入集群)
rabbitmqctl -n [当前节点主机名] reset
# 加入
rabbitmqctl join_cluster rabbit@[主节点主机名]

恢复服务

rabbitmqctl start_app

配置成功后显示如下

集群配置

主节点添加交换机等操作,其它节点会同步,但是不能同步队列内的消息

如果主节点宕机其它两个节点无法向外界提供服务

消费者在主节点没有宕机的情况下才能从从节点消费消息

镜像集群

官方文档

镜像队列机制就是将队列在三个节点之间设置主从关系,消息在三个节点之间同步,且如果其中一个节点不可用,并不会导致消息丢失或服务不可用,提升了MQ集群的整体高可用性

是对副本集群的加强

镜像集群

策略说明

rabbitmqctl set_policy -p [vhost] [name] [pattern] [definition]

-p vhost 指定主机下的队列进行设置,默认对所有节点使用该策略(可选)

name 策略的名称

pattern 队列的匹配模式

definition 镜像定义

ha-mode 指定镜像队列的模式

all 集群中所有节点上进行镜像

exactly 在指定个数的节点上进行进行

nodes 在指定的节点上进行进行镜像

ha-params 提供ha-mode中的个数

ha-sync-mode 队列的同步方式

automatic 自动同步

manual 手动同步,在mq输入命令才会同步

priority 策略的优先级(可选)

查看节点当前的策略

rabbitmqctl list_policies

使用策略

rabbitmqctl set_policy ha-all '^Hello' '{"ha-mode":"all","ha-sync-mode":"automatic"}'

删除策略

rabbitmqctl clear_policy [策略名]

分布式事务

分布式事务指操作位于不同节点上,需要保证事务的AICD特性

如库存和订单不在一个微服务中就涉及分布式事务

两阶段提交2PC

引入协调者来协调参与者行为,并最终决定这些参与者是否真正参与事务

但是它有一系列问题

  • 同步阻塞 所有事务参与者在等待其它参与者响应的时候都处于同步阻塞状态,无法进行其它操作

  • 单点问题 协调者2PC起到非常大的作用,发送故障会造成较大影响,特别是在二阶段发生故障时所有参与者会一直等待

  • 数据不一致 如果协调者只发送部分Commit消息,如果网络出现异常那么只会有部分参与者提交事务,最终导致消息不一致

  • 任意一个节点失败就会导致整个事务失败,没有完善的容错机制

补偿事务

争对每个操作都需要注册一个与其对应的确认和补偿操作

  • Try阶段 主要对业务系统做检测及资源预留

  • Commit阶段 主要是对业务系统确认提交,默认不会出错

  • Cancel阶段 主要是在业务执行操作,需要回滚状态下执行业务取消预留资源释放

本地消息表

本地消息表与业务数据表处于同一个数据库中,这样就能利用本地事务来保证在对这两个表的操作满足事务特性,并且 使用了消息队列来保证最终致性

  • 在分布式事务操作的一方完成写业务数据的操作之后向本地消息表发送一个消息, 本地事务能保证这个消息一定会被 写入本地消息表中

  • 之后将本地消息表中的消息转发到Kafka等消息队列中,如果转发成功则将消息从本地消息表中删除,否则继续重新转发

  • 在分布式事务操作的另一方从消息队列中读取一个消息,并执行消息中的操作

MQ事务消息

RocketMQ支持事务消息类似于二阶段提交。Kafka不支持事务消息

MQ的确认机制

  • 为了确保数据一定会发送到MQ中

  • 在同一事务中添加一个冗余表的记录订单数据每天数据是否发送成功的状态

  • 然后利用RabbitMQ提供的publisher/confirm开启确认机制后,如果消息正常发送到MQ中就会获取到回执消息,然后把状态修改为已发送

开启消息确认机制

spring:
  rabbitmq:
    username: user
    password: password
    virtual-host: /
    # MQ的集群地址
    addresses: 106.14.8.213:5672
    # 开启消息确认机制
    publisher-confirm-type: correlated

可靠生产

将数据存到本地库中

public void saveOrder(Order order) throws Exception {
    if (orderMapper.saveOrder(order.getOrderId(), order.getUserId(), order.getOrderContent()) != 1) {
        throw new Exception("订单创建失败,原因[数据库操作失败]");
    }
    //因为在下单可能会会rabbit会出现宕机,就引发消息是没有放入MQ.为来消息可靠生产,对消息做一次冗余
    saveLocalMessage(order);
}

public void saveLocalMessage(Order order) throws Exception {
    // 添加运动记录, 默认为0
    if (orderMapper.saveLocalMessage(order.getOrderId(), order.getOrderContent(), 0, 1) != 1) {
        throw new Exception("出现异常,原因[数据库操作失败]");
    }
}

消息确认机制

使用了消息确认机制省略@Transactional注解

@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。PostConstruct在构造函数之后执行,init方法之前执行

只能修饰非静态的void方法

@PostConstruct
public void regCallback() {
    // 消息发送成功以后,给予生产者的消息回执,来确保生产者的可靠性
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        System.out.println("cause:" + cause);
        // 如果ack为true代表消息已经收到
        String orderId = correlationData.getId();
        if (!ack) {
            // 这里可能需要进行其他的方式进行存储
            System.out.println("MQ队列应答失败,orderId是:" + orderId);
            return;
        }
        try {
            // 插入到冗余表中
            if (orderMapper.updateStatusByMessage(orderId) != 0) {
                System.out.println("本地消息状态修改成功,消息成功投递到消息队列中...");
            }
        } catch (Exception ex) {
            System.out.println("本地消息状态修改失败,出现异常:" + ex.getMessage());
        }
    });
}
public void sendMessage(Order order) {
    // 通过MQ发送消息
    rabbitTemplate.convertAndSend("order_fanout_exchange", "", JsonUtil.obj2String(order),
            new CorrelationData(order.getOrderId()));
}

可靠消费

消息重试

一般情况下acknowledge-mode和retry二选一

spring:
  rabbitmq:
    username: user
    password: password
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual # 开启手动ack,让程序去控制MQ的消息的重发和删除和转移
        retry:
          enabled: true  # 开启重试(默认关闭)
          max-attempts: 3  # 最大重试次数(默认3次)
          initial-interval: 2000ms  # 重试间隔时间
    addresses: 106.14.8.213:5672

利用RabbitMQ的ACK机制,由消费者自身控制消息的重发、清除和丢弃

需要考虑幂等性问题、因为定时重发会造成消息的重新发送,可以使用唯一主键或分布式锁

可以通过@Scheduled注解进行定时任务

@RabbitListener(queues = {"order.queue"})
public void messageconsumer(String ordermsg, Channel channel,
                            CorrelationData correlationData,
                            @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
    try {
        // 获取消息队列的消息
        System.out.println("收到MQ的消息是: " + ordermsg );
        // 获取订单服务信息
        Order order = JsonUtil.string2Obj(ordermsg, Order.class);
        // 获取订单id
        String orderId = order.getOrderId();
        // 保存运单
        dispatchService.dispatch(orderId);
        // 手动确认消息接受完毕
        channel.basicAck(tag, false);
    } catch (Exception ex) {
        // 消息拒收
        // 第三个参数为false不会重发,会把消息打入到死信队列
        // 为true重发,可能导致死循环并且重试机制上限的配置不会生效
        channel.basicNack(tag, false, false);
    }
}

如果消费者出现异常会触发MQ的重试机制

最终导致不断发送消息,直到系统负载过高宕机

解决消息重试的解决方案

  • 控制重发的次数

  • try catch + 手动ACK

  • try catch + 手动ACK + 死信队列

重试次数与try catch只能二选一

死信队列

需要order.queue指定死信队列dead.order.queue

@RabbitListener(queues = {"dead.order.queue"})
public void messageconsumer(String ordermsg, Channel channel,
                            CorrelationData correlationData,
                            @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
    try {
        System.out.println("收到MQ的消息是: " + ordermsg + ",count = " + count++);
        Order order = JsonUtil.string2Obj(ordermsg, Order.class);
        String orderId = order.getOrderId();
        dispatchService.dispatch(orderId);
        channel.basicAck(tag, false);
        System.out.println("correlationData" + correlationData);
    } catch (Exception ex) {
        // 死信队列不再重发,直接移除
        System.out.println("手动处理消息失败");
        channel.basicAck(tag, false);
    }
}

幂等性

正常业务逻辑会保存订单,死信队列中也会保存订单,可能导致消息重复消费,所以需要考虑幂等性问题

  • 可以通过数据库的唯一id进行插入,每次消费消息时用该id先判断该消息是否已消费过

  • 可以通过分布式锁