Java搭建RabbitMq消息中間件過程詳解
這篇文章主要介紹了Java搭建RabbitMq消息中間件過程詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
前言
當(dāng)系統(tǒng)中出現(xiàn)“生產(chǎn)“和“消費(fèi)“的速度或穩(wěn)定性等因素不一致的時(shí)候,就需要消息隊(duì)列。
名詞
- exchange: 交換機(jī)
- routingkey: 路由key
- queue:隊(duì)列
控制臺(tái)端口:15672
exchange和queue是需要綁定在一起的,然后消息發(fā)送到exchange再由exchange通過routingkey發(fā)送到對(duì)應(yīng)的隊(duì)列中。
使用場(chǎng)景
1.技能訂單3分鐘自動(dòng)取消,改變狀態(tài)
2.直播開始前15分鐘提醒
3.直播狀態(tài)自動(dòng)結(jié)束
流程
生產(chǎn)者發(fā)送消息 —> order_pre_exchange交換機(jī) —> order_per_ttl_delay_queue隊(duì)列
—> 時(shí)間到期 —> order_delay_exchange交換機(jī) —> order_delay_process_queue隊(duì)列 —> 消費(fèi)者
第一步:在pom文件中添加
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
第二步:在application.properties文件中添加
spring.rabbitmq.host=172.xx.xx.xxx spring.rabbitmq.port=5672 spring.rabbitmq.username=rabbit spring.rabbitmq.password=123456 spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true
第三步:配置 OrderQueueConfig
package com.tuohang.platform.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * rabbitMQ的隊(duì)列設(shè)置(生產(chǎn)者發(fā)送的消息,永遠(yuǎn)是先進(jìn)入exchange,再通過路由,轉(zhuǎn)發(fā)到隊(duì)列) * * * @author Administrator * @version 1.0 * @Date 2018年9月18日 */ @Configuration public class OrderQueueConfig { /** * 訂單緩沖交換機(jī)名稱 */ public final static String ORDER_PRE_EXCHANGE_NAME = "order_pre_exchange"; /** * 發(fā)送到該隊(duì)列的message會(huì)在一段時(shí)間后過期進(jìn)入到order_delay_process_queue 【隊(duì)列里所有的message都有統(tǒng)一的失效時(shí)間】 */ public final static String ORDER_PRE_TTL_DELAY_QUEUE_NAME = "order_pre_ttl_delay_queue"; /** * 訂單的交換機(jī)DLX 名字 */ final static String ORDER_DELAY_EXCHANGE_NAME = "order_delay_exchange"; /** * 訂單message時(shí)間過期后進(jìn)入的隊(duì)列,也就是訂單實(shí)際的消費(fèi)隊(duì)列 */ public final static String ORDER_DELAY_PROCESS_QUEUE_NAME = "order_delay_process_queue"; /** * 訂單在緩沖隊(duì)列過期時(shí)間(毫秒)30分鐘 */ public final static int ORDER_QUEUE_EXPIRATION = 1800000; /** * 訂單緩沖交換機(jī) * * @return */ @Bean public DirectExchange preOrderExange() { return new DirectExchange(ORDER_PRE_EXCHANGE_NAME); } /** * 創(chuàng)建order_per_ttl_delay_queue隊(duì)列,訂單消息經(jīng)過緩沖交換機(jī),會(huì)進(jìn)入該隊(duì)列 * * @return */ @Bean public Queue delayQueuePerOrderTTLQueue() { return QueueBuilder.durable(ORDER_PRE_TTL_DELAY_QUEUE_NAME) .withArgument("x-dead-letter-exchange", ORDER_DELAY_EXCHANGE_NAME) // DLX .withArgument("x-dead-letter-routing-key", ORDER_DELAY_PROCESS_QUEUE_NAME) // dead letter攜帶的routing key .withArgument("x-message-ttl", ORDER_QUEUE_EXPIRATION) // 設(shè)置訂單隊(duì)列的過期時(shí)間 .build(); } /** * 將order_pre_exchange綁定到order_pre_ttl_delay_queue隊(duì)列 * * @param delayQueuePerOrderTTLQueue * @param preOrderExange * @return */ @Bean public Binding queueOrderTTLBinding(Queue delayQueuePerOrderTTLQueue, DirectExchange preOrderExange) { return BindingBuilder.bind(delayQueuePerOrderTTLQueue).to(preOrderExange).with(ORDER_PRE_TTL_DELAY_QUEUE_NAME); } /** * 創(chuàng)建訂單的DLX exchange * * @return */ @Bean public DirectExchange delayOrderExchange() { return new DirectExchange(ORDER_DELAY_EXCHANGE_NAME); } /** * 創(chuàng)建order_delay_process_queue隊(duì)列,也就是訂單實(shí)際消費(fèi)隊(duì)列 * * @return */ @Bean public Queue delayProcessOrderQueue() { return QueueBuilder.durable(ORDER_DELAY_PROCESS_QUEUE_NAME).build(); } /** * 將DLX綁定到實(shí)際消費(fèi)隊(duì)列 * * @param delayProcessOrderQueue * @param delayExchange * @return */ @Bean public Binding dlxOrderBinding(Queue delayProcessOrderQueue, DirectExchange delayOrderExchange) { return BindingBuilder.bind(delayProcessOrderQueue).to(delayOrderExchange).with(ORDER_DELAY_PROCESS_QUEUE_NAME); } /** * 監(jiān)聽訂單實(shí)際消費(fèi)者隊(duì)列order_delay_process_queue * * @param connectionFactory * @param processReceiver * @return */ @Bean public SimpleMessageListenerContainer orderProcessContainer(ConnectionFactory connectionFactory, OrderProcessReceiver processReceiver) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(ORDER_DELAY_PROCESS_QUEUE_NAME); // 監(jiān)聽order_delay_process_queue container.setMessageListener(new MessageListenerAdapter(processReceiver)); return container; } }
消費(fèi)者 OrderProcessReceiver :
package com.tuohang.platform.config; import java.util.Objects; import org.apache.tools.ant.types.resources.selectors.Date; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; /** * 訂單延遲處理消費(fèi)者 * * * @author Administrator * @version 1.0 * @Date 2018年9月18日 */ @Component public class OrderProcessReceiver implements ChannelAwareMessageListener { private static Logger logger = LoggerFactory.getLogger(OrderProcessReceiver.class); String msg = "The failed message will auto retry after a certain delay"; @Override public void onMessage(Message message, Channel channel) throws Exception { try { processMessage(message); } catch (Exception e) { // 如果發(fā)生了異常,則將該消息重定向到緩沖隊(duì)列,會(huì)在一定延遲之后自動(dòng)重做 channel.basicPublish(OrderQueueConfig.ORDER_PRE_EXCHANGE_NAME, OrderQueueConfig.ORDER_PRE_TTL_DELAY_QUEUE_NAME, null, msg.getBytes()); } } /** * 處理訂單消息,如果訂單未支付,取消訂單(如果當(dāng)消息內(nèi)容為FAIL_MESSAGE的話,則需要拋出異常) * * @param message * @throws Exception */ public void processMessage(Message message) throws Exception { String realMessage = new String(message.getBody()); logger.info("Received <" + realMessage + ">"); // 取消訂單 if(!Objects.equals(realMessage, msg)) { // SpringKit.getBean(ITestService.class).resetSexById(Long.valueOf(realMessage)); System.out.println("測(cè)試111111-----------"+new Date()); System.out.println(message); } } }
或者
/** * 測(cè)試 rabbit 消費(fèi)者 * * * @author Administrator * @version 1.0 * @Date 2018年9月25日 */ @Component @RabbitListener(queues = TestQueueConfig.TEST_DELAY_PROCESS_QUEUE_NAME) public class TestProcessReceiver { private static Logger logger = LoggerFactory.getLogger(TestProcessReceiver.class); String msg = "The failed message will auto retry after a certain delay"; @RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { try { processMessage(message); //告訴服務(wù)器收到這條消息 已經(jīng)被我消費(fèi)了 可以在隊(duì)列刪掉;否則消息服務(wù)器以為這條消息沒處理掉 后續(xù)還會(huì)在發(fā) channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 如果發(fā)生了異常,則將該消息重定向到緩沖隊(duì)列,會(huì)在一定延遲之后自動(dòng)重做 channel.basicPublish(TestQueueConfig.TEST_PRE_EXCHANGE_NAME, TestQueueConfig.TEST_PRE_TTL_DELAY_QUEUE_NAME, null, msg.getBytes()); } } /** * 處理訂單消息,如果訂單未支付,取消訂單(如果當(dāng)消息內(nèi)容為FAIL_MESSAGE的話,則需要拋出異常) * * @param message * @throws Exception */ public void processMessage(Message message) throws Exception { String realMessage = new String(message.getBody()); logger.info("Received < " + realMessage + " >"); // 取消訂單 if(!Objects.equals(realMessage, msg)) { System.out.println("測(cè)試111111-----------"+new Date()); }else { System.out.println("rabbit else..."); } } }
生產(chǎn)者
/** * 測(cè)試rabbitmq * * @return */ @RequestMapping(value = "/testrab") public String testraa() { GenericResult gr = null; try { String name = "test_pre_ttl_delay_queue"; long expiration = 10000;//10s 過期時(shí)間 rabbitTemplate.convertAndSend(name,String.valueOf(123456)); // 在單個(gè)消息上設(shè)置過期時(shí)間 //rabbitTemplate.convertAndSend(name,(Object)String.valueOf(123456), new ExpirationMessagePostProcessor(expiration)); } catch (ServiceException e) { e.printStackTrace(); gr = new GenericResult(StateCode.ERROR, languageMap.get("network_error"), e.getMessage()); } return getWrite(gr); }
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持我們。
上一篇:Java實(shí)現(xiàn)雙保險(xiǎn)線程的示例代碼
欄 目:Java
下一篇:SpringBoot使用RabbitMQ延時(shí)隊(duì)列(小白必備)
本文標(biāo)題:Java搭建RabbitMq消息中間件過程詳解
本文地址:http://mengdiqiu.com.cn/a1/Java/8918.html
您可能感興趣的文章
- 01-10Java實(shí)現(xiàn)動(dòng)態(tài)模擬時(shí)鐘
- 01-10利用Java實(shí)現(xiàn)復(fù)制Excel工作表功能
- 01-10JavaWeb實(shí)現(xiàn)郵件發(fā)送功能
- 01-10java基于poi導(dǎo)出excel透視表代碼實(shí)例
- 01-10Java實(shí)現(xiàn)動(dòng)態(tài)數(shù)字時(shí)鐘
- 01-10基于Java驗(yàn)證jwt token代碼實(shí)例
- 01-10java實(shí)現(xiàn)液晶數(shù)字字體顯示當(dāng)前時(shí)間
- 01-10淺談Java中真的只有值傳遞么
- 01-10Java動(dòng)態(tài)顯示當(dāng)前日期和時(shí)間
- 01-10如何解決線程太多導(dǎo)致java socket連接池出現(xiàn)的問題


閱讀排行
- 1C語(yǔ)言 while語(yǔ)句的用法詳解
- 2java 實(shí)現(xiàn)簡(jiǎn)單圣誕樹的示例代碼(圣誕
- 3利用C語(yǔ)言實(shí)現(xiàn)“百馬百擔(dān)”問題方法
- 4C語(yǔ)言中計(jì)算正弦的相關(guān)函數(shù)總結(jié)
- 5c語(yǔ)言計(jì)算三角形面積代碼
- 6什么是 WSH(腳本宿主)的詳細(xì)解釋
- 7C++ 中隨機(jī)函數(shù)random函數(shù)的使用方法
- 8正則表達(dá)式匹配各種特殊字符
- 9C語(yǔ)言十進(jìn)制轉(zhuǎn)二進(jìn)制代碼實(shí)例
- 10C語(yǔ)言查找數(shù)組里數(shù)字重復(fù)次數(shù)的方法
本欄相關(guān)
- 01-10Java實(shí)現(xiàn)動(dòng)態(tài)模擬時(shí)鐘
- 01-10Springboot中@Value的使用詳解
- 01-10JavaWeb實(shí)現(xiàn)郵件發(fā)送功能
- 01-10利用Java實(shí)現(xiàn)復(fù)制Excel工作表功能
- 01-10Java實(shí)現(xiàn)動(dòng)態(tài)數(shù)字時(shí)鐘
- 01-10java基于poi導(dǎo)出excel透視表代碼實(shí)例
- 01-10java實(shí)現(xiàn)液晶數(shù)字字體顯示當(dāng)前時(shí)間
- 01-10基于Java驗(yàn)證jwt token代碼實(shí)例
- 01-10Java動(dòng)態(tài)顯示當(dāng)前日期和時(shí)間
- 01-10淺談Java中真的只有值傳遞么
隨機(jī)閱讀
- 01-10使用C語(yǔ)言求解撲克牌的順子及n個(gè)骰子
- 01-11ajax實(shí)現(xiàn)頁(yè)面的局部加載
- 08-05DEDE織夢(mèng)data目錄下的sessions文件夾有什
- 01-11Mac OSX 打開原生自帶讀寫NTFS功能(圖文
- 01-10delphi制作wav文件的方法
- 04-02jquery與jsp,用jquery
- 01-10SublimeText編譯C開發(fā)環(huán)境設(shè)置
- 08-05織夢(mèng)dedecms什么時(shí)候用欄目交叉功能?
- 08-05dedecms(織夢(mèng))副欄目數(shù)量限制代碼修改
- 01-10C#中split用法實(shí)例總結(jié)