springboot+rabbitmq实战
安装
archlinux 安装启动
- yaourt -S rabbitmq
- systemctl start rabbitmq.service
- 测试服务启动
- rabbitmqctl status
- 安装管理客户端
- rabbitmq-plugins enable rabbitmq
management - web访问地址 http://127.0.0.1:15672
- rabbitmq-plugins enable rabbitmq
centos(阿里ecs)
- yaourt -S rabbitmq-server
- systemctl start rabbitmq-server.service
- 测试服务启动
- 安装管理客户端
- rabbitmq-plugins enable rabbitmq
management - web访问地址 http://47.75.36.192:15672
- rabbitmq-plugins enable rabbitmq
- 参考: https://www.rabbitmq.com/install-rpm.html
配置
增加用户并授权
- rabbitmqctl add
useradmin 123456 # 增加用户并设置用户名密码 - rabbitmqctl set
usertagsadmin administrator # 授权 - rabbitmqctl set
permissions-p / admin '.*' '.*' '.*' #
不知道干啥的但是需要不然访问不了
常用操作
相关概念
通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ
在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器
(Exchange). 这样发消息者和队列就没有直接联系,
转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。
虚拟主机
一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,
RabbitMQ 当中,用户只能在虚拟主机的粒度进行权限控制。
因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个
RabbitMQ 服务器都有一个默认的虚拟主机”/“。
交换机(Exchange)
交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:Direct,
topic, Headers and Fanout Direct:direct 类型的行为是”先匹配, 再投送”.
即在绑定时设定一个 routingkey, 消息的routingkey 匹配时,
才会被交换器投送到绑定的队列中去, 默认方式
Topic:按规则转发消息(最灵活), 主要通过通配符 Headers:设置 header
attribute 参数类型的交换机 Fanout:转发消息到所有绑定队列
springboot整合
redirexchange 默认
引入依赖
1
2
3
4<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>配置rabbit地址, application.yaml
1
2
3
4
5
6
7spring:
rabbitmq:
host: localhost
password: guest
port: 5672
username: guest增加queue配置
1
2
3
4
5
6
7
8
9
10
11
12
13@Configuration
public class RabbitConfig {
/**
* 创建一个叫hello的queue
* @return
*/
@Bean
public Queue queue(){
return new Queue("ttang");
}
}producer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19@Component
@Slf4j
public class RabbitProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsgString(){
String context = "ttang say: " + new Date();
log.info(context);
rabbitTemplate.convertAndSend("ttang", context);
}
public void sendMsgInt(int i){
log.info("{}, {}", Thread.currentThread().getStackTrace()[1].getMethodName(), i);
rabbitTemplate.convertAndSend("ttang", i);
}
}consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27/**
*
* 接收消息
*
*/
@Component
@RabbitListener(queues = "ttang")
@Slf4j
public class RabbitConsumer {
@RabbitHandler
public void receiveMsgString(String tt){
log.info("收到ttang消息, {}", tt);
}
/**
* Caused by: org.springframework.amqp.AmqpException: Ambiguous methods for payload type: class java.lang.String:
* msg and msg2
* 1. 不能有两个payload同时匹配, 会根据消息类型自动匹配
* @param tt
*/
@RabbitHandler
public void receiveMsgInt(Integer tt){
log.info("收到ttang消息2, {}", tt);
}
}test
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitConsumerTest {
@Autowired
private RabbitProducer producer;
@Test
public void msg() {
// final ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
// for (int i = 0; i < 10; i++) {
// fixedThreadPool.execute(() -> {
// producer.sendMsgString();
// });
// }
for (int i = 0; i < 10; i++) {
producer.sendMsgInt(i);
producer.sendMsgString();
}
}
}
topicexchange 模式匹配订阅
依赖、地址同上
queue,exchange配置, 配置路由规则
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54/**
* 需要三个bean实例 queue, exchange, bindexchange
*
* queue1比queue2只有一个字母之差,模拟匹配
*/
@Configuration
public class TopicExchangeConfig {
/**
* 定制主题
* @return
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}
@Bean
public Queue queueMessage(){
return new Queue("topic.message");
}
/**
* 精确匹配topic.message
* @param queueMessage
* @param topicExchange
* @return
*/
@Bean
Binding bindingMessage(Queue queueMessage, TopicExchange topicExchange){
return BindingBuilder.bind(queueMessage).to(topicExchange).with("topic.message");
}
@Bean
public Queue queueMessages(){
return new Queue("topic.messages");
}
/**
* 匹配多个
* *表示一个词.
* #表示零个或多个词.
*
* 这个queue能接收多个消息
* @param queueMessages
* @param topicExchange
* @return
*/
@Bean
Binding bindingMessages(Queue queueMessages, TopicExchange topicExchange){
return BindingBuilder.bind(queueMessages).to(topicExchange).with("topic.#");
}
}生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32@Component
@Slf4j
public class TopicExchangeProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 同时匹配 topic.message和 topic.# 所以会发到这两个queue里
* @param msg
*/
public void sendMessage(String msg){
final StackTraceElement element = Thread.currentThread().getStackTrace()[1];
log.info("{}, {}", element.getMethodName(), msg);
rabbitTemplate.convertAndSend("topicExchange", "topic.message"
, String.join(",", Arrays.asList(element.getClassName(), element.getMethodName(), msg)));
}
/**
* 只能匹配到topic.#, 多了个s, 所以只会匹配发送到 queuemessages
* BindingBuilder.bind(queueMessages).to(topicExchange).with("topic.#");
* 相应的接收也只会是topic.messages
* @param msg
*/
public void sendMessages(String msg){
final StackTraceElement element = Thread.currentThread().getStackTrace()[1];
log.info("{}, {}", element.getMethodName(), msg);
rabbitTemplate.convertAndSend("topicExchange", "topic.messages"
, String.join(",", Arrays.asList(element.getClassName(), element.getMethodName(), msg)));
}
}消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30/**
* 监听topic.message
*/
@Slf4j
@Component
@RabbitListener(queues = "topic.message")
public class TopicConsumerMessage {
@RabbitHandler
public void receiveMsgString(String msg){
final StackTraceElement element = Thread.currentThread().getStackTrace()[1];
log.info("{}, {}", element.getMethodName(), msg);
}
}
/**
* 监听topic.message
*/
@Slf4j
@Component
@RabbitListener(queues = "topic.messages")
public class TopicConsumerMessages {
@RabbitHandler
public void receiveMsgString(String msg){
final StackTraceElement element = Thread.currentThread().getStackTrace()[1];
log.info("{}, {}", element.getMethodName(), msg);
}
}测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40@RunWith(SpringRunner.class)
@SpringBootTest
public class TopicExchangeProducerTest {
@Autowired
private TopicExchangeProducer topicExchangeProducer;
/**
* 2019-08-11 12:54:46.482 INFO 18374 --- [ main] c.l.d.s.t.p.TopicExchangeProducer :
* sendMessage, 我爱你 糖糖
* <p>
* <p>
* 2019-08-11 12:54:46.502 INFO 18374 --- [ntContainer#2-1] c.l.d.s.t.c.TopicConsumerMessages :
* receiveMsgString, com.lx.demo.springbootrabbitmq.tpcexchange.producer.TopicExchangeProducer,sendMessage,我爱你 糖糖
* 2019-08-11 12:54:46.503 INFO 18374 --- [ntContainer#1-1] c.l.d.s.t.consumer.TopicConsumerMessage :
* receiveMsgString, com.lx.demo.springbootrabbitmq.tpcexchange.producer.TopicExchangeProducer,sendMessage,我爱你 糖糖
* 这里producer发送用的routing_key topic.message匹配topic.message和topic.#, 所以可以干到两个queue里, 相应的消费两个queue的都能接收到
*/
@Test
public void sendMessage() {
topicExchangeProducer.sendMessage("我爱你 糖糖");
}
/**
* 2019-08-11 12:57:00.110 INFO 18687 --- [ main] c.l.d.s.t.p.TopicExchangeProducer :
* sendMessages, i love you, ttang
* <p>
* <p>
* 2019-08-11 12:57:00.128 INFO 18687 --- [ Thread-2] o.s.a.r.l.SimpleMessageListenerContainer : Waiting
* for workers to finish.
* 2019-08-11 12:57:00.128 INFO 18687 --- [ntContainer#2-1] c.l.d.s.t.c.TopicConsumerMessages :
* receiveMsgString, com.lx.demo.springbootrabbitmq.tpcexchange.producer.TopicExchangeProducer,sendMessages,i
* love you, ttang
* messages的消息发送者用的routing_key只能匹配topic.#, 所以只能到topic.messages这个queue
*/
@Test
public void sendMessages() {
topicExchangeProducer.sendMessages("i love you, ttang");
}
}
fanoutexchange 发布订阅
依赖、地址同上
queue配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82/**
* 定义多个queue, 一个Fanoutexchange, 配置绑定binding
* 这样发送到交换机上的消息都会传递到所有绑定queue
*
* 这个最像传统的主题订阅模式
*/
@Configuration
public class FanoutExchangeConfig {
/**
* 创建交换机
* @return
*/
@Bean
FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
/**
* 创建queue
* @return
*/
@Bean
Queue fanoutQueue1(){
return new Queue("fanout.1");
}
/**
* 这里不在需要指定binding_key, 就算配了也么得用
* @param fanoutQueue1
* @param fanoutExchange
* @return
*/
@Bean
Binding binding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
@Bean
Queue fanoutQueue2(){
return new Queue("fanout.2");
}
@Bean
Binding binding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
@Bean
Queue fanoutQueue3(){
return new Queue("fanout.3");
}
/**
* Description:
*
* Parameter 0 of method binding1 in com.lx.demo.springbootrabbitmq.fanoutexchange.config.FanoutExchangeConfig
* required a single bean, but 6 were found:
* - queue: defined by method 'queue' in class path resource
* [com/lx/demo/springbootrabbitmq/direxchange/config/RabbitConfig.class]
* - fanoutQueue1: defined by method 'fanoutQueue1' in class path resource
* [com/lx/demo/springbootrabbitmq/fanoutexchange/config/FanoutExchangeConfig.class]
* - fanoutQueue2: defined by method 'fanoutQueue2' in class path resource
* [com/lx/demo/springbootrabbitmq/fanoutexchange/config/FanoutExchangeConfig.class]
* - fanoutQueue3: defined by method 'fanoutQueue3' in class path resource
* [com/lx/demo/springbootrabbitmq/fanoutexchange/config/FanoutExchangeConfig.class]
* - queueMessage: defined by method 'queueMessage' in class path resource
* [com/lx/demo/springbootrabbitmq/tpcexchange/config/TopicExchangeConfig.class]
* - queueMessages: defined by method 'queueMessages' in class path resource
* [com/lx/demo/springbootrabbitmq/tpcexchange/config/TopicExchangeConfig.class]
*
* @param fanoutQueue3
* @param fanoutExchange
* @return
*/
@Bean
Binding binding3(Queue fanoutQueue3, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
}
}生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18@Component
@Slf4j
public class FanoutExchangeProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* fanout的方式不再需要routing-key
* @param msg
*/
public void sendMsg(String msg){
log.info("{}, {}", Thread.currentThread().getStackTrace()[1].getMethodName(), msg);
// 这里一定要注意函数重载,只有三个参数时,第一个才是exchange
rabbitTemplate.convertAndSend("fanoutExchange","", msg);
}
}注意, 发送时是三个参数
消费者
1
2
3
4
5
6
7
8
9
10
11@Slf4j
@Component
@RabbitListener(queues = "fanout.1")
public class FanoutConsumer1 {
@RabbitHandler
public void receiveMessageString(String msg){
log.info("{}, {}", Thread.currentThread().getStackTrace()[1].getMethodName(), msg);
}
}测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26@RunWith(SpringRunner.class)
@SpringBootTest
public class FanoutExchangeProducerTest {
@Autowired
private FanoutExchangeProducer fanoutExchangeProducer;
/**
* 2019-08-11 13:48:16.402 INFO 26183 --- [ main] c.l.d.s.f.p.FanoutExchangeProducer : sendMsg,
* 大渣吼,我是范二特
*
*
* 2019-08-11 13:48:16.425 INFO 26183 --- [ntContainer#2-1] c.l.d.s.f.consumer.FanoutConsumer2 :
* receiveMessageString, 大渣吼,我是范二特
* 2019-08-11 13:48:16.425 INFO 26183 --- [ntContainer#1-1] c.l.d.s.f.consumer.FanoutConsumer1 :
* receiveMessageString, 大渣吼,我是范二特
* 2019-08-11 13:48:16.425 INFO 26183 --- [ntContainer#3-1] c.l.d.s.f.consumer.FanoutConsumer3 :
* receiveMessageString, 大渣吼,我是范二特
*
* 发送一条消息,上述三个消费者都收到了消息
*/
@Test
public void sendMsg() {
fanoutExchangeProducer.sendMsg("大渣吼,我是范二特");
}
}
完整代码
https://github.com/microzhao/demo/tree/master/springboot/springboot-rabbitmq