SpringBoot集成RabbitMQ-实现订单超时取消

1.解决方案#

利用RabbitMQ的死信队列机制实现订单超时自动取消;
具体为下单后发送设置了TTL的订单消息到队列A,当超时未消费时,消息被转发到对应的死信队列B,监听队列B的消费者收到消息对其进行处理;
消费者对订单进行取消操作需保证幂等,因为可能订单已经被用户主动取消;

2.Maven配置#

1
2
3
4
5
6
<!-- Spring RabbitMQ -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<!-- 版本随spring-boot-starter-parent -->
</dependency>

3.YML配置#

1
2
3
4
5
6
7
spring:
rabbitmq:
host: 192.168.31.208
port: 5672
virtual-host: ysmspace
username: username1
password: password1

4.代码配置#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.yeshimin.ysmspace.config;

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMqConfiguration {

public static final String EXCHANGE_DEFAULT_DIRECT = "amq.direct";
public static final String QUEUE_ORDER_PENDING = "q.order.pending";
public static final String ROUTING_ORDER_PENDING = "rk.order.pending";
public static final String DLX_ORDER_PENDING = EXCHANGE_DEFAULT_DIRECT;
public static final String DLQ_ORDER_PENDING = "q.dl.order.pending";
public static final String DLR_ORDER_PENDING = "rk.dl.order.pending";

@Bean
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}

@Bean
public DirectExchange defaultDirectExchange() {
return new DirectExchange(EXCHANGE_DEFAULT_DIRECT);
}

@Bean
public Queue orderPendingQueue() {
Map<String, Object> args = new HashMap<>();
// args.put("x-message-ttl", 30 * 60 * 1000); ttl由消息实体指定
args.put("x-dead-letter-exchange", DLX_ORDER_PENDING);
args.put("x-dead-letter-routing-key", DLR_ORDER_PENDING);
return new Queue(QUEUE_ORDER_PENDING, true, false, false, args);
}

@Bean
public Queue orderPendingDeadLetterQueue() {
return new Queue(DLQ_ORDER_PENDING);
}

@Bean
public Binding orderPendingBinding() {
return new Binding(QUEUE_ORDER_PENDING, Binding.DestinationType.QUEUE,
EXCHANGE_DEFAULT_DIRECT, ROUTING_ORDER_PENDING, null);
}

@Bean
public Binding orderPendingDeadLetterBinding() {
return new Binding(DLQ_ORDER_PENDING, Binding.DestinationType.QUEUE,
DLX_ORDER_PENDING, DLR_ORDER_PENDING, null);
}
}

5.消息发布和消费及相关配置#

5.1 消息后置处理器-用于设置消息的超时时间#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.yeshimin.ysmspace.rabbitmq;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class DefaultMessagePostProcessor implements MessagePostProcessor {

@Value("${order.timeout}")
private Integer orderTimeout;

@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration(String.valueOf(orderTimeout));
return message;
}
}

5.2 消息发布#

1
2
3
4
5
6
7
8
9
10
11
12
13
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private DefaultMessagePostProcessor defaultMessagePostProcessor;

// ...

rabbitTemplate.convertAndSend(
RabbitMqConfiguration.EXCHANGE_DEFAULT_DIRECT,
RabbitMqConfiguration.ROUTING_SYNC_TO_ES,
JSON.toJSONString(message), defaultMessagePostProcessor);

// ...

5.3 消息消费#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.yeshimin.ysmspace.rabbitmq;

// ... import

@Slf4j
@Component
public class DefaultRabbitListener {

@Autowired
private OrderAppService orderAppService;

@RabbitListener(queues = RabbitMqConfiguration.DLQ_ORDER_PENDING)
public void listen(String message) {
log.debug("listen(), message: {}", message);

OrderPendingMessage jsonData = JSON.parseObject(message, OrderPendingMessage.class);

try {
orderAppService.cancel(jsonData.getOrderId());
} catch (Exception e) {
log.warn("未成功取消订单!");
e.printStackTrace();
}
}
}

6.版本信息#

  • springboot parent - 2.3.5.RELEASE
  • rabbitmq - 3.3.5