消息队列
消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
消息队列(Message Queue)是一种应用间的通信方式
,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,比如发放红包、发短信通知等。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。
以上是用于业务解耦的情况,其它常见场景包括最终一致性、广播、错峰流控等等。
RabbitMQ 特点
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:
1、可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
2、灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
3、 消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
3、 高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
4、多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
5、多语言客户端(Many Clients)
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
6、 管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
7、 跟踪机制(Tracing)
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
RabbitMQ安装
一般来说安装 RabbitMQ 之前要安装 Erlang ,可以去Erlang官网下载。接着去RabbitMQ官网下载安装包,之后解压缩即可。
Erlang官方下载地址:https://www.erlang.org/downloads
RabbitMQ官方下载地址:https://www.rabbitmq.com/download.html
依赖包安装
安装RabbitMQ之前必须要先安装所需要的依赖包可以使用下面的一次性安装命令
yum install gcc glibc-devel make ncurses-devel openssl-devel xmlto -y
安装Erlang
1、 将Erlang源代码包otp_src_19.3.tar.gz上传到Linux的/home目录下
2、解压erlang 源码包
1
| tar -zxvf otp_src_19.3.tar.gz
|
3、手动创建erlang 的安装目录
4、进入erlang的解压目录
5、配置erlang的安装信息
1
| ./configure --prefix=/usr/local/erlang --without-javac
|
6、编译并安装
7、配置环境变量
8、将这些配置填写到profile文件的最后
1 2 3
| ERL_HOME=/usr/local/erlang PATH=$ERL_HOME/bin:$PATH export ERL_HOME PATH
|
9、启动环境变量配置文件
开始安装
1、 将RabbitMQ安装包rabbitmq-server-3.7.2-1.el7.noarch.rpm上传到/home目录
2、安装RabbitMQ
1
| rpm -ivh --nodeps rabbitmq-server-3.7.2-1.el7.noarch.rpm
|
启动和关闭
1、启动RabbitMQ
1.5、后台启动
有时&
不能后台启动,用以下命令即可(关闭服务的状态下,后台启动)
1
| rabbitmq-server -detached
|
注意:这里可能会出现错误,错误原因是/var/lib/rabbitmq/.erlang.cookie文件权限不够。
解决方案对这个文件授权
1 2 3
| chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
chmod 400 /var/lib/rabbitmq/.erlang.cookie
|
2、停止服务
插件管理
1、添加插件
1
| rabbitmq-plugins enable {插件名}
|
2、删除插件
1
| rabbitmq-plugins disable {插件名}
|
注意:RabbitMQ启动以后可以使用浏览器进入管控台,但是默认情况RabbitMQ不允许直接使用浏览器浏览器进行访问因此必须添加插件
1
| rabbitmq-plugins enable rabbitmq_management
|
3、使用浏览器访问管控台http://RabbitMQ服务器IP:15672
http://192.168.147.130:15672
用户管理
RabbitMQ安装成功后使用默认用户名guest登录
账号:guest
密码:guest
注意:这里guest只允许本机登录访问需要创建用户并授权远程访问命令如下
1、 添加用户:rabbitmqctl add_user {username} {password}
1
| rabbitmqctl add_user root root
|
2、 删除用户:rabbitmqctl delete_user {username}
3、 修改密码:rabbitmqctl change_password {username} {newpassword}
1
| rabbitmqctl change_password root root
|
4、 设置用户角色:rabbitmqctl set_user_tags {username} {tag}
1
| rabbitmqctl set_user_tags root administrator
|
tag参数表示用户角色取值为:management、monitoring、policymaker、administrator
各角色详解:
management
用户可以通过AMQP做的任何事外加:
列出自己可以通过AMQP登入的virtual hosts
查看自己的virtual hosts中的queues, exchanges 和 bindings
查看和关闭自己的channels 和 connections
查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动。
policymaker
management可以做的任何事外加:
查看、创建和删除自己的virtual hosts所属的policies和parameters
monitoring
management可以做的任何事外加:
列出所有virtual hosts,包括他们不能登录的virtual hosts
查看其他用户的connections和channels
查看节点级别的数据如clustering和memory使用情况
查看真正的关于所有virtual hosts的全局的统计信息
administrator
policymaker和monitoring可以做的任何事外加:
创建和删除virtual hosts
查看、创建和删除users
查看创建和删除permissions
关闭其他用户的connections
权限管理
1、 授权命令:rabbitmqctl set_permissions [-p vhostpath] {user} {conf} {write} {read}
-p vhostpath :用于指定一个资源的命名空间,例如 –p / 表示根路径命名空间
user:用于指定要为哪个用户授权填写用户名
conf:一个正则表达式match哪些配置资源能够被该用户配置。
write:一个正则表达式match哪些配置资源能够被该用户读。
read:一个正则表达式match哪些配置资源能够被该用户访问。
例如:
1
| rabbitmqctl set_permissions -p / root '.*' '.*' '.*'
|
用于设置root用户拥有对所有资源的 读写配置权限
2、查看用户权限 rabbitmqctl list_permissions [vhostpath]
例如
查看根径经下的所有用户权限
1
| rabbitmqctl list_permissions
|
查看指定命名空间下的所有用户权限
1
| rabbitmqctl list_permissions /abc
|
3、查看指定用户下的权限rabbitmqctl list_user_permissions {username}
例如
查看root用户下的权限
1
| rabbitmqctl list_user_permissions root
|
4、清除用户权限rabbitmqctl clear_permissions {username}
例如:
清除root用户的权限
1
| rabbitmqctl clear_permissions root
|
vhost管理
vhost是RabbitMQ中的一个命名空间,可以限制消息的存放位置利用这个命名空间可以进行权限的控制有点类似Windows中的文件夹一样,在不同的文件夹中存放不同的文件。
1、添加vhost: rabbitmqctl add vhost {name}
例如
1
| rabbitmqctl add_vhost powernode
|
2、删除vhost:rabbitmqctl delete vhost {name}
例如
1
| rabbitmqctl delete_vhost powernode
|
消息发送和接收
所有 MQ 产品从模型抽象上来说都是一样的过程:
消费者(consumer)订阅某个队列。
生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。
上面是MQ的基本抽象模型,但是不同的MQ产品有有者不同的机制,RabbitMQ实际基于AMQP协议的一个开源实现,因此RabbitMQ内部也是AMQP的基本概念。
RabbitMQ的内部接收如下:
1、Message
消息,消息是不具体的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
2、Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
3、Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。.
4 Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
5、Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
6、Connection
网络连接,比如一个TCP连接。
7、Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
8、Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
9、Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 /
10、Broker
表示消息队列服务器实体。
Exchange 类型
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型
direct
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中
。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“green”,则只转发 routing key 标记为“green”的消息,它是完全匹配、单播的模式。
fanout
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播
,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
- 适用场景:
群聊功能,广播消息给当前群聊中的所有人
大型玩家在玩在线游戏的时候,可以用它来广播重大消息
topic
topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上
。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”
和“*”
。#
匹配0个或多个单词,*
匹配不多不少一个单词。
- 适用场景:
新闻的分类更新
同一个任务多个工作者协调完成
同一问题需要特定人员知晓
Java调用Rabbitmq客户端
此处是原生写法,以后一般都是用springboot的啦
在java中实现对Rabbitmq进行消息队列编程,需要导入rabbitmq客户端
。为了实现生产者和消费者的通信,接下来分别创建两个项目
进行模拟。
导包
1 2 3 4 5
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.13.1</version> </dependency>
|
小总结
一般情况下交换机和队列都是让监听者/消费者去创建(也要看实际业务情况)
创建工厂
、配置好地址
和端口号
、账号
与密码
、实例化链接对象
和实例化通道对象
就可以设置队列
和交换机
啦
创建队列:
1 2 3 4 5 6 7
| //创建队列 ,名字为myQueue //参数一:队列名 //参数二:消息是否持久化 //参数三:是否独有的,一般false //参数四:队列消费完后是否删除 //参数五:??? channel.queueDeclare("myQueue", true, false, false, null);
|
直连交换机
1 2 3 4 5
| //参数一:交换机名 //参数二:队列名/路由名 //参数三:道具??? //参数四:要发送的信息,需编码 channel.basicPublish("","myQueue",null,message.getBytes("UTF-8"));
|
声明交换机
1 2 3 4
| //参数一:交换机名 //参数二:类型 //参数三:是否持久化 channel.exchangeDeclare("topicExchange","topic",true);
|
绑定
1 2 3 4
| //参数一:队列名 //参数二:交换机名 //参数三:key channel.queueBind("topicQueue01","topicExchange","aa.#");
|
无交换机模式
生产者
创建发送者类Sender,编写发送消息的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class Sender { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory=new ConnectionFactory(); factory.setHost("119.91.252.224"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("root"); Connection connection=null; Channel channel=null; connection=factory.newConnection(); channel=connection.createChannel(); String message ="Hello World!"; channel.queueDeclare("myQueue", true, false, false, null); channel.basicPublish("","myQueue",null,message.getBytes("UTF-8")); System.out.println("消息发送成功: "+message); channel.close(); connection.close(); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class Receiver { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("root"); factory.setPassword("root"); factory.setHost("119.91.252.224"); Connection conn = factory.newConnection(); final Channel channel = conn.createChannel(); channel.queueDeclare("myQueue", true, false, false, null); boolean autoAck = true; String consumerTag = ""; channel.basicConsume("myQueue", autoAck, consumerTag, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr); } });
channel.close(); conn.close(); } }
|
direct模式
只接收相同key、路由匹配的
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class DirectReceiver { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("119.91.252.224"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("root");
Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare("myDirectQue1",true,false,false,null); channel.exchangeDeclare("directExchange", "direct",true); channel.queueBind("myDirectQue1","directExchange", "directRoutingKey"); channel.basicConsume("myDirectQue1",true,"",new DefaultConsumer(channel){ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("消费者 = " + message); } }); } }
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class DirectSender { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("119.91.252.224"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("root");
Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
String message = "direct的消息!"; channel.basicPublish("directExchange", "directRoutingKey", null, message.getBytes("utf-8")); System.out.println("发送成功");
} }
|
fanout模式
类似广播
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class FanoutReceiver { public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("119.91.252.224"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("root");
Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
String queueName= channel.queueDeclare().getQueue(); channel.exchangeDeclare("fanoutExchange","fanout",true); channel.queueBind(queueName,"fanoutExchange",""); channel.basicConsume(queueName,true,"",new DefaultConsumer(channel){ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message=new String(body); System.out.println("Receive01消费者 ---"+message); } }); } }
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class FanoutSender { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("119.91.252.224"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("root");
Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
String message = "fanout的测试消息!"; channel.basicPublish("fanoutExchange", "", null, message.getBytes("utf-8")); System.out.println("消息发送成功");
} }
|
topic模式
根据不同的key接受相应的消息
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class TopicReceiver { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("119.91.252.224"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("root");
Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("topicQueue01",true,false,true,null); channel.exchangeDeclare("topicExchange","topic",true); channel.queueBind("topicQueue01","topicExchange","aa.#"); channel.basicConsume("topicQueue01",true,"",new DefaultConsumer(channel){ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message=new String(body); System.out.println("Receive01消费者aa.# ---"+message); } }); } }
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class TopicSender { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("119.91.252.224"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("root");
Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
String message="topic的测试消息!"; channel.basicPublish("topicExchange","aa.bb.cc",null,message.getBytes("utf-8")); System.out.println("消息发送成功"); } }
|
确认机制
在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎么解决呢?
RabbitMQ为我们提供了两种方式:
通过AMQP事务
机制实现,这也是AMQP协议层面提供的解决方案;
通过将channel设置成confirm模式
来实现;
事务机制
RabbitMQ中与事务机制有关的方法有三个:txSelect(), txCommit()以及txRollback()
txSelect:用于将当前channel设置成transaction模式
txCommit:用于提交事务
txRollback:用于回滚事务
在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。
发送者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public class Sender { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("119.91.252.224"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("root");
Connection connection = null; Channel channel = null;
connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare("directTransactionExchange", "direct", true);
String message = "事务的第一条消息";
String message2 = "事务的第二条消息";
long start = System.currentTimeMillis(); channel.txSelect();
for (int i = 0; i < 10000; i++) { channel.basicPublish("directTransactionExchange", "transactionRoutingKey", null, message.getBytes("utf-8"));
channel.basicPublish("directTransactionExchange", "transactionRoutingKey", null, message2.getBytes("utf-8")); }
channel.txCommit(); long end = System.currentTimeMillis() - start; System.out.println("运行时间" + end); System.out.println("消息发送成功"); } }
|
接收者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class Reciver { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("119.91.252.224"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("root");
Connection connection = null; Channel channel = null;
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("transactionQueue", true, false, false, null); channel.exchangeDeclare("directTransactionExchange", "direct", true); channel.queueBind("transactionQueue", "directTransactionExchange", "transactionRoutingKey");
channel.basicConsume("transactionQueue", true, "", new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("消费者 ---" + message); } });
} }
|
生产者确认模式
生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理;
confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息;
开启confirm模式的方法:
生产者通过调用channel的confirmSelect方法将channel设置为confirm模式,(注意一点,已经在transaction事务模式的channel是不能再设置成confirm模式的,即这两种模式是不能共存的)
普通confirm模式
每发送一条消息,调用waitForConfirms()
方法等待服务端confirm,这实际上是一种串行的confirm,每publish一条消息之后就等待服务端confirm,如果服务端返回false或者超时时间内未返回,客户端进行消息重传;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| public class SenderConfirm { public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("119.91.252.224"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("root");
Connection connection = null; Channel channel = null;
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("confirmQueue",true,false,false,null);
String message="普通发送者确认模式测试消息!"; channel.confirmSelect();
long time=System.currentTimeMillis(); for(int i=1;i<=100;i++){ message="Hello World!..."+i; System.out.println(message); channel.basicPublish("","confirmQueue",null,message.getBytes("utf-8")); } System.out.println("单条确认使用时间="+(System.currentTimeMillis()-time)); System.out.println("消息发送成功"+message); } }
|
批量confirm模式
每发送一批消息之后,调用waitForConfirms()
方法,等待服务端confirm,这种批量确认的模式极大的提高了confirm效率,但是如果一旦出现confirm返回false或者超时的情况,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息,如果这种情况频繁发生的话,效率也会不升反降;
这种模式生产者不是每发送一条就等待broker确认,而是发送一批,实现代码见下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| public class SenderPConfirm { public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("119.91.252.224"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("root");
Connection connection = null; Channel channel = null;
connection=factory.newConnection(); channel=connection.createChannel(); channel.queueDeclare("confirmQueue",true,false,false,null); channel.confirmSelect(); long time=System.currentTimeMillis(); for(int i=0;i<10000;i++) { String message=String.format("时间=》%s",new Date().getTime()); System.out.println(message); channel.basicPublish("","confirmQueue",null,message.getBytes("utf-8")); int a = 100/0; }
channel.waitForConfirmsOrDie();
System.out.println("批量确认使用时间="+(System.currentTimeMillis()-time)); System.out.println("全部发送完成..."); } }
|
异步Confirm模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| public class SenderYConfirm { public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("119.91.252.224"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("root");
Connection connection = null; Channel channel = null;
connection=factory.newConnection(); channel=connection.createChannel(); channel.queueDeclare("confirmQueue",true,false,false,null);
channel.confirmSelect(); long time=System.currentTimeMillis(); for(int i=0;i<10000;i++) { String message = "生产者:"+i; System.out.println(message); channel.basicPublish("", "confirmQueue", null, message.getBytes("utf-8")); } channel.addConfirmListener(new ConfirmListener() { public void handleAck(long l, boolean b) throws IOException { System.out.println("消息被确认了 --- 消息编号:" + l + " 是否确认了多条:" + b); }
public void handleNack(long l, boolean b) throws IOException { System.out.println("消息没有被确认-----消息编号:" + l + " 是否没有确认多条:" + b); } });
System.out.println("批量确认使用时间="+(System.currentTimeMillis()-time)); System.out.println("全部发送完成..."); } }
|
可以看到,虽然我们还是发送了100条消息,同样我们并没有收到100个ack消息 ,只收到两个或者三个ack消息,并且这两个ack消息的multiple域都为true,这点和测试1是相同的,你多次运行程序会发现每次发送回来的ack消息中的deliveryTag域的值并不是一样的,说明broker端批量回传给发送者的ack消息并不是以固定的批量大小回传的;
类似测核酸,十个人为一组进行抽查
从以上测试示例时间就可以看到waitForConfirmsOrDie方法发送100条消息并且全部收到确认需要135ms,测试2中通过监听器的方式仅仅需要1ms,说明调用waitForConfirmsOrDie会造成程序的阻塞,通过监听器并不会造成程序的阻塞
SpringBoot整合
之前的代码可以看到有很多地方冗余,spring boot就是减少重复代码的。
导包,配置文件
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
1 2 3 4
| spring.rabbitmq.addresses=119.91.252.224 spring.rabbitmq.port=5672 spring.rabbitmq.username=root spring.rabbitmq.password=root
|
direct
发送者
1 2 3 4 5 6 7 8 9 10 11
| @Component public class PaymentNotifySender { @Autowired private AmqpTemplate rabbitTemplate;
public void sender(String msg){ System.out.println("notify.payment 已发送消息: "+msg); rabbitTemplate.convertAndSend("notify.payment", msg); } }
|
@Test
调用发送者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @SpringBootTest class RabbitmqSenderApplicationTests {
@Autowired private PaymentNotifySender sender;
@Test public void test_sender1() { long start = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { sender.sender("测试1:"+i); } System.out.println(System.currentTimeMillis() - start); } }
|
接收者
1 2 3 4 5 6 7 8
| @Configuration public class DirectConfig { @Bean public Queue paymentNotifyQueue() { return new Queue("notify.payment"); } }
|
1 2 3 4 5 6 7 8
| @Component @RabbitListener(queues = "notify.payment") public class PaymentNotifyReceive { @RabbitHandler public void receive(String msg) { System.out.println("notify.payment接收消息: "+msg); } }
|
Topic模式
topic转发信息主要是依据通配符,队列和交换机的绑定。主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中.
生产者
用于创建队列,交换机,绑定交换机。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| @Configuration public class TopicConfig {
@Bean public Queue coreQueue() { return new Queue("api.core"); } @Bean public Queue paymentQueue() { return new Queue("api.payment"); }
@Bean public TopicExchange coreExchange() { return new TopicExchange("coreExchange"); } @Bean public TopicExchange paymentExchange() { return new TopicExchange("paymentExchange"); }
@Bean
public Binding bindingCoreExchange(Queue coreQueue, TopicExchange coreExchange) { return BindingBuilder.bind(coreQueue).to(coreExchange).with("api.core.*"); } @Bean public Binding bindingPaymentExchange(Queue paymentQueue, TopicExchange paymentExchange) { return BindingBuilder.bind(paymentQueue).to(paymentExchange).with("api.payment.#"); } }
|
负责用户消息发送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Component public class ApiCoreSender { @Autowired private RabbitTemplate rabbitTemplate;
public void user(String msg){ System.out.println("api.core.user send message: "+msg); rabbitTemplate.convertAndSend("coreExchange", "api.core.user", msg); }
public void userQuery(String msg){ System.out.println("api.core.user.query send message: "+msg); rabbitTemplate.convertAndSend("coreExchange", "api.core.core.query", msg); } }
|
负责订单消息发送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Component public class ApiPaymentSender { @Autowired private RabbitTemplate rabbitTemplate;
public void order(String msg){ System.out.println("api.payment.order send message: "+msg); rabbitTemplate.convertAndSend("paymentExchange", "api.payment.order", msg); }
public void orderQuery(String msg){ System.out.println("api.payment.order.query send message: "+msg); rabbitTemplate.convertAndSend("paymentExchange", "api.payment.order.query", msg); }
public void orderDetailQuery(String msg){ System.out.println("api.payment.order.detail.query send message: "+msg); rabbitTemplate.convertAndSend("paymentExchange", "api.payment.order.detail.query", msg); } }
|
测试
1 2 3 4 5 6 7 8 9 10 11 12 13
| @SpringBootTest public class ApiCoreSenderTest { @Autowired private ApiCoreSender sender; @Test public void test_user() { sender.user("用户管理!"); } @Test public void test_userQuery() { sender.userQuery("查询用户信息!"); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @SpringBootTest public class ApiPaymentSenderTest { @Autowired private ApiPaymentSender sender;
@Test public void test_order() { sender.order("订单管理!"); }
@Test public void test_orderQuery() { sender.orderQuery("查询订单信息!"); }
@Test public void test_orderDetailQuery() { sender.orderDetailQuery("查询订单详情信息!"); } }
|
消费者
1 2 3 4 5 6 7 8 9
| @Component public class ApiCoreRecive { @RabbitHandler @RabbitListener(queues = "api.core") public void user(String msg) { System.out.println("api.core 接受的消息: " + msg); } }
|
1 2 3 4 5 6 7 8 9
| @Component public class ApiPaymentRecive { @RabbitHandler @RabbitListener(queues = "api.payment") public void order(String msg) { System.out.println("api.payment.order 接收消息: "+msg); } }
|
启动消费者项目,分别调用生产者的用户消息和订单消息的发送,测试消费者接受到消息。
fanout模式
是路由广播的形式,将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略.在广播模式中,发送者只要发送消息,所有绑定的队列都会接收到,所以在本例中,将交换机,队列的创建都交给消费者项目完成。
广播:发送到路由器的消息会使得绑定到该路由器的每一个Queue接收到消息,这个时候就算指定了Key,或者规则(即上文中convertAndSend方法的参数2),也会被忽略!
交换机类型:FanoutExchange
rabbitTemplate.convertAndSend(“交换机名”,“ ”,“消息内容”);//路由键被忽略
消费端:只要是绑定到该交换机上的都能收到消息
生产者
1 2 3 4 5 6 7 8 9 10
| @Component public class ApiReportSender { @Autowired private AmqpTemplate rabbitTemplate;
public void generateReports(String msg){ System.out.println("api.generate.reports 发送消息: "+msg); rabbitTemplate.convertAndSend("reportExchange", "api.generate.reports", msg); } }
|
测试
1 2 3 4 5 6 7 8 9
| @SpringBootTest public class ApiReportSenderTest { @Autowired private ApiReportSender sender; @Test public void test_generateReports() { sender.generateReports("开始生成报表!"); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Configuration public class FanoutConfig { @Bean public Queue reportPaymentQueue() { return new Queue("api.report.payment"); } @Bean public Queue reportRefundQueue() { return new Queue("api.report.refund"); } @Bean public FanoutExchange reportExchange() { return new FanoutExchange("reportExchange"); }
@Bean public Binding bindingReportPaymentExchange(Queue reportPaymentQueue, FanoutExchange reportExchange) { return BindingBuilder.bind(reportPaymentQueue).to(reportExchange); } @Bean public Binding bindingReportRefundExchange(Queue reportRefundQueue, FanoutExchange reportExchange) { return BindingBuilder.bind(reportRefundQueue).to(reportExchange); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Component public class ApiReportReceived { @RabbitHandler @RabbitListener(queues = "api.report.payment") public void payment(String msg) { System.out.println("api.report.payment 接收的消息: "+msg); }
@RabbitHandler @RabbitListener(queues = "api.report.refund") public void refund(String msg) { System.out.println("api.report.refund 接收的消息: "+msg); } }
|
消息发送复杂对象
在生产者项目中定义实体对象Order,对象必须可序列化
在消费者项目中定义同样的实体类型Order,用于接收消息中的订单对象
集群
待记录
项目使用记录
生产控制层
1 2 3 4 5 6 7
| @PostMapping("/convertAndSend") @ApiOperation(value = "mq生产,批量插入") public R convertAndSend(@RequestBody List<Message> entitys) { fanoutProducer.sendMessage(entitys); log.info("消息开始插入"); return R.ok("sucess"); }
|
发送到mq里面去
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package cn.gx.module.message.controller;
import cn.gx.api.demo.domain.Message; import com.gx.common.mq.FanoutMQConfig; import lombok.AllArgsConstructor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
import java.util.List;
@Component @AllArgsConstructor(onConstructor_ = {@Autowired}) public class FanoutProducer {
private RabbitTemplate rabbitTemplate;
public void sendMessage(List<Message> message){ rabbitTemplate.convertAndSend(FanoutMQConfig.FANOUT_EXCHANGE_NAME, "", message); } }
|
预先写好订阅哪个交换机和队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| package com.gx.common.mq;
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class FanoutMQConfig {
public static final String FIRST_FANOUT_QUEUE_NAME = "cn.gx.mq.fanout.first";
public static final String SECOND_FANOUT_QUEUE_NAME = "cn.gx.mq.fanout.second";
public static final String FANOUT_EXCHANGE_NAME = "cn.gx.mq.fanout.exchange";
@Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUT_EXCHANGE_NAME); }
@Bean public Queue firstFanoutQueue() { return new Queue(FIRST_FANOUT_QUEUE_NAME); }
@Bean public Queue secondFanoutQueue() { return new Queue(SECOND_FANOUT_QUEUE_NAME); }
@Bean public Binding firstFanoutBinding() { return BindingBuilder.bind(firstFanoutQueue()).to(fanoutExchange()); }
@Bean public Binding secondFanoutBinding() { return BindingBuilder.bind(secondFanoutQueue()).to(fanoutExchange()); } }
|
接收控制层(这里有两个Message是应为一开始命名不规范导致的,一个是传输的消息,另一个是用作应答的)
这里注意Chanel导入的是这个包
import com.rabbitmq.client.Channel;
1 2 3 4 5 6 7 8 9 10
|
@ApiOperation(value = "mq消息消费,批量插入") @RabbitListener(queues = {FanoutMQConfig.FIRST_FANOUT_QUEUE_NAME}) @RabbitHandler public void addBatch(@RequestBody List<Message> entitys, org.springframework.amqp.core.Message messageTemp, Channel channel) throws IOException { baseService.addBatch(entitys, messageTemp, channel); log.info("消息插入成功:{}" + entitys); }
|
service层应答,这里省略业务逻辑
1 2 3 4 5 6 7 8 9 10 11
| @Override public void addBatch(List<Message> entitys, org.springframework.amqp.core.Message messageTemp, Channel channel) throws IOException { try { channel.basicAck(messageTemp.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { e.printStackTrace(); channel.basicNack(messageTemp.getMessageProperties().getDeliveryTag(), false, false); } }
|
nacos配置文件用作参考。。。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| spring: rabbitmq: primary: first listener: simple: acknowledge-mode: manual concurrency: 5 max-concurrency: 10 prefetch: 5 missing-queues-fatal: false cache: channel: size: 50 properties: first: host: 192.168.18.35 port: 5672 username: admin password: admin123 virtual-host: / publisher-confirms: true publisher-returns: true
|
服务启动不成功,因为我没@Import上mq配置文件(但我没问题呀。。。不解~~)
他原话(“问题在于你config注入spring 但是message模块启动的时候应该是找不到config那个文件,只需要启动的时候注入这个类就行”)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package cn.gx.module.message;
import cn.gx.tool.cloud.annotation.GxCloudConfig; import com.gx.common.constant.AppConstants; import com.gx.common.mq.FanoutMQConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Import;
@GxCloudConfig(basePackages = AppConstants.BASE_PACKAGES) @SpringBootApplication @Slf4j @Import({FanoutMQConfig.class}) public class MessageApplication { public static void main(String[] args) { SpringApplication.run(MessageApplication.class,args); } }
|
操作时有参考此博客
https://blog.csdn.net/weixin_44404170/article/details/121317297