-
引入依賴
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.66</version> </dependency> </dependencies>
-
加入配置
spring.application.name=springboot_rabbitmq spring.rabbitmq.host=192.168.233.102 spring.rabbitmq.virtual-host=/ spring.rabbitmq.username=root spring.rabbitmq.password=123456 spring.rabbitmq.port=5672
-
隊列配置
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.amqp.core.*; import java.util.HashMap; import java.util.Map; /** * 定義訂單的隊列年缎、交換機(jī)荡碾、隊列綁定交換機(jī)庭瑰、設(shè)置路由key */ @Configuration public class OrderRabbitConfig { @Bean public Queue orderQueueTTL(){ Map<String, Object> arguments = new HashMap<>(); // ttl = 30s arguments.put("x-message-ttl", 30 * 1000); // 設(shè)置關(guān)聯(lián)的死信隊列 if (true) { arguments.put("x-dead-letter-exchange", "ex.order.dlx"); arguments.put("x-dead-letter-routing-key", "order.pay.dlx"); } return new Queue("q.order.ttl", false, false, false, arguments); } @Bean public Exchange orderExchangeTTL() { return new DirectExchange("ex.order.ttl", false, false); } @Bean public Binding orderBindingTTL(){ return BindingBuilder.bind(orderQueueTTL()).to(orderExchangeTTL()).with("order.pay.ttl").noargs(); } }
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * 定義死信:隊列桂躏、交換機(jī)、隊列綁定交換機(jī)烧董、設(shè)置路由key */ @Configuration public class DLXRabbitConfig { @Bean public Queue orderQueueDLX(){ return new Queue("q.order.dlx", false, false, false, null); } @Bean public Exchange orderExchangeDLX() { return new DirectExchange("ex.order.dlx", false, false); } @Bean public Binding orderBindingDLX(){ return BindingBuilder.bind(orderQueueDLX()).to(orderExchangeDLX()).with("order.pay.dlx").noargs(); } }
-
訂單實現(xiàn)
/** * 訂單控制器 */ @Controller @RequestMapping("/order") public class OrderController { @Autowired private AmqpTemplate rabbitTemplate; /** * 提交訂單 * @return */ @RequestMapping("/submit") @ResponseBody public String submit(){ String orderNo = UUID.randomUUID().toString(); final Order order = new Order(orderNo, "測試商品", new BigDecimal(1000.00), new Date(), Order.STATUS_WAIT_PAY); OrderDB.orderMap.put(orderNo, order); rabbitTemplate.convertAndSend("ex.order.ttl", "order.pay.ttl", JSONObject.toJSON(order).toString()); return "下單成功待支付:" + orderNo; } /** * 訂單列表 * @return */ @RequestMapping("/list") @ResponseBody public Object[] list(){ return OrderDB.orderMap.values().toArray(); } }
-
支付實現(xiàn)
/** * 支付控制器 */ @Controller @RequestMapping("/pay") public class PayController { @Autowired CachingConnectionFactory factory; @RequestMapping("/{orderNo}") @ResponseBody public String pay(@PathVariable("orderNo") String orderNo){ final Order order = OrderDB.orderMap.get(orderNo); // 從mq中拉取支付信息毁靶,確認(rèn)支付狀態(tài) if (order.getOrderStatus().equals(Order.STATUS_WAIT_PAY)) { final Connection connection = factory.createConnection(); try(final Channel channel = connection.createChannel(true)){ // 主動拉取消息: 從隊列; 指定是否自動確認(rèn)消息 final GetResponse getResponse = channel.basicGet("q.order.ttl", true); // 需要注意的 basicAck 方法需要傳遞兩個參數(shù) // deliveryTag(唯一標(biāo)識 ID):當(dāng)一個消費(fèi)者向 RabbitMQ 注冊后逊移,會建立起一個 Channel 预吆,RabbitMQ 會用 basic.deliver 方法向消費(fèi)者推送消息,這個方法攜帶了一個 delivery tag胳泉, 它代表了 RabbitMQ 向該 Channel 投遞的這條消息的唯一標(biāo)識 ID拐叉,是一個單調(diào)遞增的正整數(shù)岩遗,delivery tag 的范圍僅限于 Channel // multiple:為了減少網(wǎng)絡(luò)流量,手動確認(rèn)可以被批處理凤瘦,當(dāng)該參數(shù)為 true 時宿礁,則可以一次性確認(rèn) delivery_tag 小于等于傳入值的所有消息 channel.basicAck(getResponse.getEnvelope().getDeliveryTag(),false); // 更新訂單狀態(tài) order.setOrderStatus(Order.STATUS_PAY_SUCCESS); OrderDB.orderMap.put(orderNo, order); } catch (Exception e){ e.printStackTrace(); } finally { if (connection.isOpen()){ connection.close(); } } return "支付成功"; } else if (order.getOrderStatus().equals(Order.STATUS_PAY_SUCCESS)) { return "支付已支付成功,請勿重復(fù)支付"; } else { return "訂單已超時"; } } /** * 監(jiān)聽訂單死信隊列消息 */ @RabbitListener(queues = "q.order.dlx") public void dixLinseed(String message){ System.out.println(message); try { final Order order = JSONObject.parseObject(message, Order.class); if (order.getOrderStatus().equals(Order.STATUS_WAIT_PAY)) { order.setOrderStatus(Order.STATUS_TIMEOUT); OrderDB.orderMap.put(order.getOrderNo(), order); System.out.println("訂單支付超時:" + order.getOrderNo()); } else if (order.getOrderStatus().equals(Order.STATUS_PAY_SUCCESS)) { System.out.println("訂單已支付成功廷粒,死信隊列監(jiān)聽處理窘拯。红且。坝茎。跳過"); } } catch (Exception e){ e.printStackTrace(); } } }
操作地址
提交訂單:
http://localhost:8080/order/submit
訂單列表:
http://localhost:8080/order/list
訂單支付:
http://localhost:8080/pay/{orderNo}