0%

RabbitMQ

基础知识

中间件技术

一些常用的中间件技术

  • 分布式消息中间件
    • ActiveMQ
    • RabbitMQ
    • Kafka
    • RocketMQ
  • 负载均衡中间件
    • Nginx
    • LVS负载均衡
    • KeepAlive
    • CDN
  • 缓存中间件
    • MemCache
    • Redis
  • 数据库中间件
    • Mycat
    • ShardingJdbc

MQ解决的问题

  • 流量消峰
  • 应用解耦
  • 异步处理

MQ协议

  • AMQP协议

    特性:

    • 分布式事务
    • 消息持久化
    • 高性能和高可靠的消息处理

    支持RabbitMQ,ActiveMQ

  • MQTT协议

    特性:

    • 轻量级

    • 结构简单

    • 传输快

    • 不支持持久化

    支持RabbitMQ,ActiveMQ

  • OpenMessage协议

    特性:

    • 结构简单
    • 解析速度快
    • 支持事务和持久化

    支持RocketMQ

  • Kafka协议

    特性:

    • 结构简单
    • 解析速度快
    • 不支持事务
    • 可以持久化

    支持Kafka

MQ相关概念

MQ中的角色

image-20210925131710544

生产者、交换机、队列、消费者

RabbitMQ工作原理

image-20210925132032224

开始RabbitMQ

安装和配置

安装

官方下载文档Downloading and Installing RabbitMQ — RabbitMQ

手动安装还需要Erlang语言环境的支持,安装较为繁琐,因此这里使用docker安装

1
2
3
# for RabbitMQ 3.8,
# 3.8.x support timeline: https://www.rabbitmq.com/versions.html
docker run -it --name rabbitmq --restart=always -p 5672:5672 -p 15672:15672 rabbitmq:3.8-management

我们安装的RabbitMQ是带有RabbitManagement的,不需要额外开启功能,如果是手动安装的RabbitMQ需要先停止server再开启服务:

1
rabbitmq-plugins enable rabbitmq_management

进入15672端口,如果没有配置过,则默认用户名和密码都是guest

RabbitMQ用户管理

1
2
3
4
5
rabbitmqctl list_users #查看所有用户
rabbitmqctl add_user 用户名 密码 #添加新用户
#授权格式:
#rabbitmqctl set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permisssions -p "/" 用户名 ".*" ".*" ".*" #给这个用户这个virtualhost中所有资源的配置和读写权限

一些常用命令

image-20210926181830650

HelloRabbitMQ

注意提前开放5672端口

配置依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<!--RabbitMQ客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.13.1</version>
</dependency>
<!--操作文件流-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>

<!--添加插件指定SDK版本-->
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class Producer {
//队列名
public static final String QUEUE_NAME = "hello";

public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//配置连接工厂
factory.setHost("x.x.x.x");
factory.setUsername("guest");
factory.setPassword("xxx");

//创建连接
Connection connection = factory.newConnection();
//获取信道
Channel channel = connection.createChannel();
/**
* 生成队列
* 1.队列名
* 2.消息是否持久化,默认保存在内存
* 3.是否允许多个消费者消费
* 4.最后一个消费者断开连接后是否自动删除
* 5.其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//发消息
String message = "Hello,RabbitMQ!";
/**
* 发送消息
* 1.发送到哪个交换机
* 2.路由的key是什么(这里使用队列名)
* 3.其他参数信息
* 4.发送消息我的消息体
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("==============>发送完成");
//释放资源
channel.close();
connection.close();
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Consumer {
public static final String QUEUE_NAME = "hello";

public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//配置连接工厂
factory.setHost("x.x.x.x");
factory.setUsername("guest");
factory.setPassword("xxx");

//创建连接
Connection connection = factory.newConnection();
//获取信道
Channel channel = connection.createChannel();
/**
* 消费信息
* 1.消费哪个队列
* 2.消费成功后是否自动应答
* 3.消费成功消费的回调
* 4.消费者取消消费的回调
*/
DeliverCallback deliverCallback = (consumeTag, message) -> {
System.out.println(new String(message.getBody()));
};
CancelCallback cancelCallback = consumeTag -> {
System.out.println("消费被中断");
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
//消费者不要释放资源
}
}

实际上消费者的两个失败的回调函数在正常执行过程中并不会触发,而是在不断监听,一旦有新的消息来到就会立即接收

工作模式

简单模式

我们上面做的例子就是简单模式的一个实现,并且是RabbitMQ的默认工作模式

提取工具类RabbitMqUtil.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class RabbitMqUtil {
public static Connection connection = null;
public static Channel channel =null;
public static Channel getChannel() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.113.225.244");
factory.setUsername("guest");
factory.setPassword("Zyh20010605");
try {
connection = factory.newConnection();
channel = connection.createChannel();
} catch (Exception e){
e.printStackTrace();
}
return channel;
}

public static void close() {
try {
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
return;
}
}

交换机的类型

  • Fanout:广播
  • Direct:定向
  • Topics:通配符

WokerQueues

image-20210925155336736

工作队列模式:采用轮训的方式,只有一个消费者能够取得队列中的消息,消费者轮流取得消息信道中的信息

Pub/Sub

image-20210925155400376

订阅模式:

X:交换机

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void exchange() throws IOException {
Channel channel = RabbitMqUtil.getChannel();
/**
* 创建交换机
* 1.交换机名称
* 2.交换机类型,通过BuiltinExchangeType对象枚举
* DIRECT:定向
* FANOUT:扇形(广播)
* TOPIC:通配符
* HEADERS:参数匹配
* 3.是否持久化
* 4.是否自动删除
* 5.仅内部使用
* 6.其他参数
*/
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,false,false,null);
//创建队列
String queue1Name = "que1";
String queue2Name = "que2";
channel.queueDeclare("que1",false,false,false,null);
channel.queueDeclare("que2",false,false,false,null);
/**
* 绑定队列和交换机
* 1.队列名称
* 2.交换机名称
* 3.路由key
*/
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");
//发布消息
String message = "exchange方法发布的消息";
channel.basicPublish(exchangeName,"", null, message.getBytes());
System.out.println("============>发送完成");
//关闭连接
RabbitMqUtil.close();
}

消费者代码与之前类似

Routing

image-20210925161959160

路由模式:生产者携带的RountingKey与指定类型匹配时才会发送到相应信道

这里仅给出特有的代码实现

1
2
//创建交换机更改枚举类型
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,false,false,null);
1
2
3
4
5
6
//绑定队列1与key:error
channel.queueBind(queue1Name,exchangeName,"error");
//绑定队列2与key:info、error、warning
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"warning");
1
2
//发布消息指定routingKey,这里是error,两个信道都能收到消息
channel.basicPublish(exchangeName,"error", null, message.getBytes());

Topics

image-20210925163321559

通配符模式:将routingkey指定为通配符,不需要完全匹配

这里仅给出特有的代码实现

1
2
//创建交换机更改枚举类型
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,false,false,null);
  • 路由键和绑定键的字符串会切分成单侧,单词之间用.分开
  • #表示匹配0个或多个单词
  • *表示匹配一个单词
1
2
3
4
5
//绑定队列1与key:'*.error'
channel.queueBind(queue1Name,exchangeName,"#.error");
//绑定队列2与key:'user.*'、'*.*'
channel.queueBind(queue2Name,exchangeName,"user.*");
channel.queueBind(queue2Name,exchangeName,"*.*");
1
2
//发布消息指定routingKey,这里是'user.error',两个信道都能收到消息
channel.basicPublish(exchangeName,"user.error", null, message.getBytes());

整合SpringBoot

配置环境

包依赖

1
2
3
4
5
<!--引入amqp协议,即RabbitMQ的依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件

1
2
3
4
5
spring:
rabbitmq:
host: x.x.x.x
username: guest
password: xxx

RabbitTemplate

使用RabbitTemplate发送和接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@SpringBootTest
class Springboot02ApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;

/**
* 发送消息
*/
@Test
void sendTest(){
//方式一:rabbitTemplate.send(交换机,key,message)
//这种方式需要我们自定义消息体内容和消息头

//方式二:rabbitTemplate.convertAndSend(交换机,key,object)
//只要传入需要发送的对象,就会被自动序列化并发送给RabbitMQ服务器
HashMap<String, Object> map = new HashMap<>();
map.put("msg", "this is a msg from client");
map.put("list", Arrays.asList(true, 2, "object3"));
rabbitTemplate.convertAndSend("springboot.mq","user.warning",map);
System.out.println("=========>消息发送完成");
}

/**
* 从que1接收消息,对应key:user.*
*/
@Test
void getTest1(){
//接收消息并自动反序列化
Object que1 = rabbitTemplate.receiveAndConvert("que1");
System.out.println(que1.getClass());
System.out.println(que1);
}

/**
* 从que2接收消息,对应key:*.info
*/
@Test
void getTest2(){
Object que2 = rabbitTemplate.receiveAndConvert("que2");
System.out.println(que2.getClass());
System.out.println(que2);
}
}

由于默认使用的是jdk序列化,如果我们需要可以注入我们自己的MessageConverter,使其转换为json序列化

1
2
3
4
5
6
7
@Configuration
public class MQConfig {
@Bean
public Jackson2JsonMessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}

注意:在使用Json序列化之后我们如果需要手动在网页端发送消息也需要使用Json的序列化格式发送

注解开发RabbitMQ

主类上添加@EnableRabbit以开启注解功能

使用注解监听队列

RabbitListener

  • 方法上使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @Service
    public class RabbitService {
    //转换器会将消息内容自动转换为实体类
    @RabbitListener(queues = "que1")
    public void listenQue1(User user){
    System.out.println(user.getClass());
    System.out.println(user.toString());
    }

    //使用Message接收消息能够拿到消息的完整信息
    @RabbitListener(queues = "que2")
    public void listenQue2(Message message){
    System.out.println(message.getClass());
    System.out.println(message.toString());
    }
    }

    image-20210925195337485

  • 类上使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    /**
    * 通过queuesToDeclare来声明队列
    * ,@Queue注解创建临时队列
    * 还可以进行一些que的详细配置
    */
    @Service
    @RabbitListener(queuesToDeclare = @Queue("hello"))
    public class customer {
    //标注这个方法是收到消息后的回调方法
    @RabbitHandler
    public void receive(String message){
    System.out.println(message);
    }
    }

AmqpAdmin

通过AmqpAdmin就可以进行消息队列的一系列操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@SpringBootTest
class Springboot02ApplicationTests {
@Autowired
AmqpAdmin amqpAdmin;

/**
* 创建交换器
*/
@Test
void createExchange(){
amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
}

/**
* 创建队列
*/
@Test
void createQue(){
amqpAdmin.declareQueue(new Queue("amqpadmin.que",true));
}

/**
* 创建绑定
*/
@Test
void createBinding(){
amqpAdmin.declareBinding(new Binding("amqpadmin.que", Binding.DestinationType.QUEUE, "amqpadmin.exchange","amqp.info",null));
}
}

使用临时队列

下面两段代码并没有直接关联,只是展示了生产者和消费者不同的实现方式

  • 生产者实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    /**
    * 向容器中注入交换机、队列、Binding组件就会自动临时创建
    */
    @Configuration
    public class priQueue {
    private static final String EXCHANGE = "springboot.mq";

    public static final String QUEUE = "que1";

    private static final String ROUTING_KEY = "user.info";
    //注入交换机
    @Bean
    DirectExchange exchange1(){
    return new DirectExchange(EXCHANGE);
    }
    //注入队列
    @Bean
    Queue queue1(){
    Map<String,Object>map = new HashMap<>();
    map.put("x-max-priority",10);//设置最大的优先级数量
    return new Queue(QUEUE,true,false,false,map);
    }
    //注入Binding
    @Bean
    public Binding binding1(){
    return BindingBuilder.bind(queue1()).to(exchange1()).with(ROUTING_KEY);
    }
    }
  • 消费者实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    /**
    * 通过queuesToDeclare来声明队列
    * ,@Queue注解中还可以进行一些详细配置
    */
    @Service
    public class customer {
    //标注这个方法是收到消息后的回调方法
    @RabbitListener(bindings = {
    @QueueBinding(
    value = @Queue, //创建临时队列
    exchange = @Exchange(value = "springboot.mq", type = "direct"),
    key = "user.info"
    )
    })
    public void receive(String message) {
    System.out.println(message);
    }
    }

高级特性

以下的代码都基于SpringBoot实现

消息可靠投递

  • confirm确认模式
  • return退回模式

image-20210925202405082

confirm模式

修改配置文件

1
spring.rabbitmq.publisher-confirm-type: correlated

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Autowired
RabbitTemplate rabbitTemplate;


@Test
void testConfirm(){
//设置ConfirmCallBack
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
/**
* confirm回调函数
* @param correlationData 相关配置信息
* @param ack exchange交换机是否成功接收到了消息
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm方法被执行了");
if (ack){
System.out.println("发送成功!");
} else {
System.out.println("发送失败,失败原因:" + cause);
}
}
});
//发送到不存在的交换机
rabbitTemplate.convertAndSend("inexist.exchange","","test information");
}

return模式

如果消息没有路由到Queue的处理方式:

  • 丢弃消息(默认)
  • 返回给发送方ReturnCallBack

修改配置文件

1
spring.rabbitmq.publisher-returns: true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Autowired
RabbitTemplate rabbitTemplate;


@Test
void testReturn(){
//设置交换机处理失败消息的模式
rabbitTemplate.setMandatory(true);
//设置ReturnCallBack
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
/**
* return回调函数
* @param returned 这个参数已经封装了返回消息的所有信息,通过get方法获取
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("执行了return方法");
System.out.println(returned.getMessage());
System.out.println(returned.getReplyCode());
System.out.println(returned.getReplyText());
System.out.println(returned.getExchange());
System.out.println(returned.getRoutingKey());
}
});
//发送key不匹配任何队列
rabbitTemplate.convertAndSend("springboot.mq","null.null","test information");
}

Consumer Ack

消费端收到消息之后的确认方式

三种确认方式:

  • 自动确认:acknowledge=”none”
  • 手动确认:acknowledge=”manual”
  • 根据异常类型确认:acknowledge=”auto”

注意只有simple和direct模式可以设置手动确认,需要指定相应的交换器为simple或direct类型

修改配置文件

1
2
3
4
5
6
7
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual #这里实际起作用的是simple的配置
direct:
acknowledge-mode: manual

向容器中注入ChannelAwareMessageListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Bean
public MessageListener ackListener(){
return new ChannelAwareMessageListener() {
//指定监听队列
@RabbitListener(queues = "que1")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long tag = message.getMessageProperties().getDeliveryTag();
System.out.println(new String(message.getBody()));
try {
System.out.println("que1==========>处理业务逻辑");
//第二个参数为是否支持多消息同时接收
channel.basicAck(tag, true);
} catch (Exception e){
System.out.println("发生异常,拒绝接收");
//第二个参数同上,第三个参数为是否返回Queue重新发送
channel.basicNack(tag,true,true);
}
}
};
}

消费端限流

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
prefetch: 1 #设置同时接收的最大消息条数

TTL

存活时间:指定时间后未被消费的消息会被自动清除

可以对单个消息进行设置,也可以对整个队列进行设置

  • 队列设置

    创建队列时添加参数,时间单位为毫秒

    • 网页实现

      image-20210926144508826

    • 代码实现

      • 自定义Que

        1
        2
        3
        4
        HashMap<String, Object> map = new HashMap<>();
        //通过map设置参数,参数名必须为指定的参数名,可以在网页端查看
        map.put("x-message-ttl",10000);
        amqpAdmin.declareQueue(new Queue("custom.que",true,false, false, map));
      • 使用QueBuilder

        1
        2
        Queue queue = QueueBuilder.nonDurable("my.que").ttl(10000).build();
        amqpAdmin.declareQueue(queue);
  • 单消息设置

    1
    2
    3
    4
    5
    rabbitTemplate.convertAndSend("springboot.mq", "user.info", "test message", message -> {
    //设置TTL
    message.getMessageProperties().setExpiration("10000");
    return message;
    });

如果设置了队列过期时间的同时设置了单个消息的过期时间

  • 队列过期时,会将队列所有消息全部移除
  • 消息一入队列就会判断这个消息的过期条件,但只有当这个消息在消息队列的顶部时才会决定是否移除

死信队列DLX

image-20210926153958945

Dead Letter Exchange:当消息成为Dead message之后,可以被重新发送到另一个交换机,这个交换机就是DLX

DLX实际上与一般的交换机没有什么区别,只是因为我们使用的功能不同,将其划分为DLX

消息成为死信的三种情况:

  • 队列消息长度达到上限
  • 消费者拒接消费消息:basicNack/basicReject,并且不把消息重新放回原消息队列:requeue=false
  • 原消息队列存在消过期设置,消息超时未被消费

具体实现

  • normal.exchange绑定normal.que

  • normal.que需要配置四个参数:x-message-ttl, x-max-length, x-dead-letter-exchange, x-dead-letter-routing-key

    对应最大ttl,队列最大长度,DLX交换机,DLX交换key

  • dead.exchange绑定dead.que

备份交换机

原理与死信队列类似,目的是防止主交换机在使用过程中宕机

使用方法,创建交换机时添加参数alternate-exchange

延迟队列

消息进入队列后不会立即被消费,而是到达指定时间后才会被消费

RabbitMQ并没有提供延迟队列的功能,不过我们可以使用TTL+DLX的组合实现延迟队列我的效果,如下图为检查用户是否支付的流程图

image-20210926164313081

这里就因为与上面死信队列的例子类似,具体实现省略

幂等性问题

如果用户在支付业务中由于网络延迟在同一业务中发送了多条支付信息,如何保证最终仅执行一次业务?

一般使用全局ID,每次消费消息时判断是否已经消费过

  • 唯一ID+指纹码

    利用一些手段生成唯一信息码,判断id是否存在数据库中,但在高并发场景可能会有性能问题

  • Redis原子性(推荐)

    利用Redis执行setnx操作,天然具有幂等性,实现不重复消费

优先级队列

优先级队列0~255,优先级越大越优先执行

  • 添加/声明队列时添加参数x-max-priority:表示允许设置的最大优先级

  • 发送方式

    1
    2
    3
    4
    5
    rabbitTemplate.convertAndSend("springboot.mq", "user.info", "test message", message -> {
    //设置优先级
    message.getMessageProperties().setPriority(5);
    return message;
    });

惰性队列

消息保存在内存中还是磁盘中?

惰性队列的消息是保存在磁盘中的

使用场景:消费者下线,宕机等原因长时间不能消费造成堆积时

在发送100W消息,每条消息大概占1KB时,不同队列占用内存1.2GB,惰性队列仅占用内存够1.5MB,性能提升很高

  • 正常情况:消息保存在内存中
  • 惰性队列:消息保存在外存中

创建队列时携带参数x-queue-mode = lazy即可开启

系统操作

日志与监控

日志

默认存放位置/var/log/rabbitmq/rabbit@主机名.log

使用docker在容器内路径/var/log/rabbitmq/log

消息追踪

消息追踪会降低性能,一般只在生产和测试环境开启

打开trace相关配置

1
2
rabbitmqctl trace_on
rabbitmq-plugins enable rabbitmq_tracing

这时在网页控制台的Admin界面的右侧出现新的选项:Tracing

在这里我们可以添加自己的消息追踪文件

需要注意的是,只有用户名和密码都为guest用户才能添加trace,否则会报错,具体原因未知

内存管理

rabbitmq默认内存为总内存的40%

修改可用内存大小

当发现内存爆满,连接全部挂起的情景时,选择其中一种方式即可

1
2
rabbitmqctl set_vm_memory_high_watermark 百分比 #设置百分比,一般取值为0.4~0.7
rabbitmqctl set_vm_memory_high_watermark absolute 100MB #设置绝对大小

修改预警外存大小

当可用的外存大小小于指定值时开始预警

集群搭建

集群模式

  • 主备模式

    正常情况下备用节点不提供服务

    也就是一个主/备方案,主节点提供读写,备用节点不提供读写。如果主节点挂了,就切换到备用节点,原来的备用节点升级为主节点提供读写服务,当原来的主节点恢复运行后,原来的主节点就变成备用节点,和 activeMQ 利用 zookeeper 做主/备一样,也可以一主多备。

  • 远程模式

    早期模式,目前使用较少

    主机作负载均衡,超过阈值的数据交由副机处理

  • 镜像模式

    节点之间的数据可以进行同步

  • 多活模式

    多个负载均衡的数据节点集群

搭建集群

注意点(无论是不是使用docker):

  • 主机名的设置
  • 使用相同的Cookie文件

创建三个RabbitMq容器

官方推荐使用--erlang-cookie来代替RABBITMQ_ERLANG_COOKIE

1
2
3
docker run -d --hostname rabbitmq --name rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.8-management
docker run -d --hostname rabbitmq-s1 --name rabbitmq-s1 -p 5673:5673 --link rabbitmq:rabbitmq -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.8-management
docker run -d --hostname rabbitmq-s2 --name rabbitmq-s2 -p 5674:5674 --link rabbitmq:rabbitmq --link rabbitmq-s1:rabbitmq-s1 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.8-management

配置三个容器

  • rabbitmq

    1
    2
    3
    4
    5
    docker exec -it rabbitmq bash
    rabbitmqctl stop_app
    rabbitmqctl reset
    rabbitmqctl start_app
    exit
  • rabbitmq-s1

    –ram表示设置为内存节点

    1
    2
    3
    4
    5
    6
    docker exec -it rabbitmq-s1 bash
    rabbitmqctl stop_app
    rabbitmqctl reset
    rabbitmqctl join_cluster --ram rabbit@rabbitmq
    rabbitmqctl start_app
    exit
  • rabbitmq-s2

    1
    2
    3
    4
    5
    6
    docker exec -it rabbitmq-s2 bash
    rabbitmqctl stop_app
    rabbitmqctl reset
    rabbitmqctl join_cluster --ram rabbit@rabbitmq
    rabbitmqctl start_app
    exit

如果内存不足可以使用命令来设置最大内存

1
rabbitmqctl set_vm_memory_high_watermark absolute 400MB

效果

image-20210926223249017

解除集群的命令

1
2
3
4
5
6
7
docker exec -it rabbitmq-s1 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
rabbitmqctl cluster_status
rabbitmqctl forget_cluster_node rabbit@rabbitmq
exit

配置镜像集群

现在我们搭建的集群是主备模式,不具备同步队列的功能的,如果主节点宕机,其中的消息会全部丢失

开启方法:

在网页管理面板点击上方Admin,再选择右边Polices,添加策略,这里给出一个示例

image-20210926225341867

这些配置表示:匹配前缀为mirror的所有交换机和队列,将他们备份2份(主机+从机一共2份),使用自动选择模式,即系统为我们选择在哪一台从机上备份

查看效果:

手动创建一个新的队列,已经备份成功的队列会有显示

image-20210926225600267

image-20210926225655853

联邦模式

联邦交换机

两个不同地区的服务器A,B:A地的用户想要查看B服务器中的数据,B地的用户想要查看A服务器中的数据

开启方式:在每台机器上进行配置

1
2
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management

数据由上游同步给下游,联邦交换机时交换机之间的通讯

image-20210926230927648

  1. 先创建上游交换机:上游up.exchange

  2. 打开右侧的Federation Upstreams,配置上游的节点信息

    url格式:amqp://用户名:密码@上游节点名

    image-20210926233401956

  3. 打开右侧的Polices,为下游交换机配置上游节点

    image-20210926233544997

  4. 配置成功之后可以在右侧Federation Status查看到联邦运行信息

    image-20210926233819454

联邦队列

创建步骤与联邦交换机类似:

  1. 创建上游队列up.que

  2. 配置上游节点信息(如果已经配置则跳过)

  3. 添加Polices(把Apply to改为Queues)

  4. 查看节点信息

    image-20210926234530718

Shovel

也是一种同步数据的实现方式

image-20210926234710535

开启方式:在每台机器上进行配置

1
2
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management

Source端创建队列Q1,Destination端创建队列Q2,配置shovel:

image-20210926235606901

查看状态信息

image-20210926235733416