Spring Boot優(yōu)雅使用RocketMQ的方法實(shí)例
前言
MQ,是一種跨進(jìn)程的通信機(jī)制,用于上下游傳遞消息。在傳統(tǒng)的互聯(lián)網(wǎng)架構(gòu)中通常使用MQ來對(duì)上下游來做解耦合。
舉例:當(dāng)A系統(tǒng)對(duì)B系統(tǒng)進(jìn)行消息通訊,如A系統(tǒng)發(fā)布一條系統(tǒng)公告,B系統(tǒng)可以訂閱該頻道進(jìn)行系統(tǒng)公告同步,整個(gè)過程中A系統(tǒng)并不關(guān)系B系統(tǒng)會(huì)不會(huì)同步,由訂閱該頻道的系統(tǒng)自行處理。
什么是RocketMQ?#
官方說明:
隨著使用越來越多的隊(duì)列和虛擬主題,ActiveMQ IO模塊遇到了瓶頸。我們盡力通過節(jié)流,斷路器或降級(jí)來解決此問題,但效果不佳。因此,我們那時(shí)開始關(guān)注流行的消息傳遞解決方案Kafka。不幸的是,Kafka不能滿足我們的要求,特別是在低延遲和高可靠性方面。
看到這里可以很清楚的知道RcoketMQ 是一款低延遲、高可靠、可伸縮、易于使用的消息中間件。
具有以下特性:
- 支持發(fā)布/訂閱(Pub/Sub)和點(diǎn)對(duì)點(diǎn)(P2P)消息模型
- 能夠保證嚴(yán)格的消息順序,在一個(gè)隊(duì)列中可靠的先進(jìn)先出(FIFO)和嚴(yán)格的順序傳遞
- 提供豐富的消息拉取模式,支持拉(pull)和推(push)兩種消息模式
- 單一隊(duì)列百萬消息的堆積能力,億級(jí)消息堆積能力
- 支持多種消息協(xié)議,如 JMS、MQTT 等
- 分布式高可用的部署架構(gòu),滿足至少一次消息傳遞語義
RocketMQ環(huán)境安裝#
下載地址:https://rocketmq.apache.org/dowloading/releases/
從官方下載二進(jìn)制或者源碼來進(jìn)行使用。源碼編譯需要Maven3.2x,JDK8
在根目錄進(jìn)行打包:
mvn -Prelease-all -DskipTests clean packager -U
distribution/target/apache-rocketmq文件夾中會(huì)存在一個(gè)文件夾版,zip,tar三個(gè)可運(yùn)行的完整程序。
使用rocketmq-4.6.0.zip:
- 啟動(dòng)名稱服務(wù) mqnamesrv.cmd
- 啟動(dòng)數(shù)據(jù)中心 mqbroker.cmd -n localhost:9876
SpringBoot環(huán)境中使用RocketMQ#
SpringBoot 入門:https://www.jb51.net/article/177449.htm
SpringBoot 常用start:https://www.jb51.net/article/177451.htm
當(dāng)前環(huán)境版本為:
- SpringBoot 2.0.6.RELEASE
- SpringCloud Finchley.RELEASE
- SpringCldod Alibaba 0.2.1.RELEASE
- RocketMQ 4.3.0
在項(xiàng)目工程中導(dǎo)入:
<!-- MQ Begin --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>${rocketmq.version}</version> </dependency> <!-- MQ End -->
由于我們這邊已經(jīng)有工程了所以就不在進(jìn)行創(chuàng)建這種過程了。主要是看看如何使用RocketMQ。
創(chuàng)建RocketMQProperties配置屬性類,類中內(nèi)容如下:
@ConfigurationProperties(prefix = "rocketmq") public class RocketMQProperties { private boolean isEnable = false; private String namesrvAddr = "localhost:9876"; private String groupName = "default"; private int producerMaxMessageSize = 1024; private int producerSendMsgTimeout = 2000; private int producerRetryTimesWhenSendFailed = 2; private int consumerConsumeThreadMin = 5; private int consumerConsumeThreadMax = 30; private int consumerConsumeMessageBatchMaxSize = 1; //省略get set }
現(xiàn)在我們所有子系統(tǒng)中的生產(chǎn)者,消費(fèi)者對(duì)應(yīng):
isEnable 是否開啟mq
namesrvAddr 集群地址
groupName 分組名稱
設(shè)置為統(tǒng)一已方便系統(tǒng)對(duì)接,如有其它需求在進(jìn)行擴(kuò)展,類中我們已經(jīng)給了默認(rèn)值也可以在配置文件或配置中心中獲取配置,配置如下:
#發(fā)送同一類消息的設(shè)置為同一個(gè)group,保證唯一,默認(rèn)不需要設(shè)置,rocketmq會(huì)使用ip@pid(pid代表jvm名字)作為唯一標(biāo)示 rocketmq.groupName=please_rename_unique_group_name #是否開啟自動(dòng)配置 rocketmq.isEnable=true #mq的nameserver地址 rocketmq.namesrvAddr=127.0.0.1:9876 #消息最大長(zhǎng)度 默認(rèn)1024*4(4M) rocketmq.producer.maxMessageSize=4096 #發(fā)送消息超時(shí)時(shí)間,默認(rèn)3000 rocketmq.producer.sendMsgTimeout=3000 #發(fā)送消息失敗重試次數(shù),默認(rèn)2 rocketmq.producer.retryTimesWhenSendFailed=2 #消費(fèi)者線程數(shù)量 rocketmq.consumer.consumeThreadMin=5 rocketmq.consumer.consumeThreadMax=32 #設(shè)置一次消費(fèi)消息的條數(shù),默認(rèn)為1條 rocketmq.consumer.consumeMessageBatchMaxSize=1
創(chuàng)建消費(fèi)者接口 RocketConsumer.java 該接口用戶約束消費(fèi)者需要的核心步驟:
/** * 消費(fèi)者接口 * * @author SimpleWu * */ public interface RocketConsumer { /** * 初始化消費(fèi)者 */ public abstract void init(); /** * 注冊(cè)監(jiān)聽 * * @param messageListener */ public void registerMessageListener(MessageListener messageListener); }
創(chuàng)建抽象消費(fèi)者 AbstractRocketConsumer.java:
/** * 消費(fèi)者基本信息 * * @author SimpelWu */ public abstract class AbstractRocketConsumer implements RocketConsumer { protected String topics; protected String tags; protected MessageListener messageListener; protected String consumerTitel; protected MQPushConsumer mqPushConsumer; /** * 必要的信息 * * @param topics * @param tags * @param consumerTitel */ public void necessary(String topics, String tags, String consumerTitel) { this.topics = topics; this.tags = tags; this.consumerTitel = consumerTitel; } public abstract void init(); @Override public void registerMessageListener(MessageListener messageListener) { this.messageListener = messageListener; } }
在類中我們必須指定這個(gè)topics,tags與消息監(jiān)聽邏輯
public abstract void init();
該方法是用于初始化消費(fèi)者,由子類實(shí)現(xiàn)。
接下來我們編寫自動(dòng)配置類RocketMQConfiguation.java,該類用戶初始化一個(gè)默認(rèn)的生產(chǎn)者連接,以及加載所有的消費(fèi)者。
@EnableConfigurationProperties({ RocketMQProperties.class }) 使用該配置文件
@Configuration 標(biāo)注為配置類
@ConditionalOnProperty(prefix = "rocketmq", value = "isEnable", havingValue = "true") 只有當(dāng)配置中指定rocketmq.isEnable = true的時(shí)候才會(huì)生效
核心內(nèi)容如下:
/** * mq配置 * * @author SimpleWu */ @Configuration @EnableConfigurationProperties({ RocketMQProperties.class }) @ConditionalOnProperty(prefix = "rocketmq", value = "isEnable", havingValue = "true") public class RocketMQConfiguation { private RocketMQProperties properties; private ApplicationContext applicationContext; private Logger log = LoggerFactory.getLogger(RocketMQConfiguation.class); public RocketMQConfiguation(RocketMQProperties properties, ApplicationContext applicationContext) { this.properties = properties; this.applicationContext = applicationContext; } /** * 注入一個(gè)默認(rèn)的消費(fèi)者 * @return * @throws MQClientException */ @Bean public DefaultMQProducer getRocketMQProducer() throws MQClientException { if (StringUtils.isEmpty(properties.getGroupName())) { throw new MQClientException(-1, "groupName is blank"); } if (StringUtils.isEmpty(properties.getNamesrvAddr())) { throw new MQClientException(-1, "nameServerAddr is blank"); } DefaultMQProducer producer; producer = new DefaultMQProducer(properties.getGroupName()); producer.setNamesrvAddr(properties.getNamesrvAddr()); // producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY"); // 如果需要同一個(gè)jvm中不同的producer往不同的mq集群發(fā)送消息,需要設(shè)置不同的instanceName // producer.setInstanceName(instanceName); producer.setMaxMessageSize(properties.getProducerMaxMessageSize()); producer.setSendMsgTimeout(properties.getProducerSendMsgTimeout()); // 如果發(fā)送消息失敗,設(shè)置重試次數(shù),默認(rèn)為2次 producer.setRetryTimesWhenSendFailed(properties.getProducerRetryTimesWhenSendFailed()); try { producer.start(); log.info("producer is start ! groupName:{},namesrvAddr:{}", properties.getGroupName(), properties.getNamesrvAddr()); } catch (MQClientException e) { log.error(String.format("producer is error {}", e.getMessage(), e)); throw e; } return producer; } /** * SpringBoot啟動(dòng)時(shí)加載所有消費(fèi)者 */ @PostConstruct public void initConsumer() { Map<String, AbstractRocketConsumer> consumers = applicationContext.getBeansOfType(AbstractRocketConsumer.class); if (consumers == null || consumers.size() == 0) { log.info("init rocket consumer 0"); } Iterator<String> beans = consumers.keySet().iterator(); while (beans.hasNext()) { String beanName = (String) beans.next(); AbstractRocketConsumer consumer = consumers.get(beanName); consumer.init(); createConsumer(consumer); log.info("init success consumer title {} , toips {} , tags {}", consumer.consumerTitel, consumer.tags, consumer.topics); } } /** * 通過消費(fèi)者信心創(chuàng)建消費(fèi)者 * * @param consumerPojo */ public void createConsumer(AbstractRocketConsumer arc) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(this.properties.getGroupName()); consumer.setNamesrvAddr(this.properties.getNamesrvAddr()); consumer.setConsumeThreadMin(this.properties.getConsumerConsumeThreadMin()); consumer.setConsumeThreadMax(this.properties.getConsumerConsumeThreadMax()); consumer.registerMessageListener(arc.messageListenerConcurrently); /** * 設(shè)置Consumer第一次啟動(dòng)是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi) 如果非第一次啟動(dòng),那么按照上次消費(fèi)的位置繼續(xù)消費(fèi) */ // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); /** * 設(shè)置消費(fèi)模型,集群還是廣播,默認(rèn)為集群 */ // consumer.setMessageModel(MessageModel.CLUSTERING); /** * 設(shè)置一次消費(fèi)消息的條數(shù),默認(rèn)為1條 */ consumer.setConsumeMessageBatchMaxSize(this.properties.getConsumerConsumeMessageBatchMaxSize()); try { consumer.subscribe(arc.topics, arc.tags); consumer.start(); arc.mqPushConsumer=consumer; } catch (MQClientException e) { log.error("info consumer title {}", arc.consumerTitel, e); } } }
然后在src/main/resources文件夾中創(chuàng)建目錄與文件META-INF/spring.factories里面添加自動(dòng)配置類即可開啟啟動(dòng)配置,我們只需要導(dǎo)入依賴即可:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.xcloud.config.rocketmq.RocketMQConfiguation
接下來在服務(wù)中導(dǎo)入依賴,然后通過我們的抽象類獲取所有必要信息對(duì)消費(fèi)者進(jìn)行創(chuàng)建,該步驟會(huì)在所有消費(fèi)者初始化完成后進(jìn)行,且只會(huì)管理是Spring Bean的消費(fèi)者。
下面我們看看如何創(chuàng)建一個(gè)消費(fèi)者,創(chuàng)建消費(fèi)者的步驟非常簡(jiǎn)單,只需要繼承AbstractRocketConsumer然后再加上Spring的@Component就能夠完成消費(fèi)者的創(chuàng)建,我們可以在類中自定義消費(fèi)的主題與標(biāo)簽。
在項(xiàng)目可以根據(jù)需求當(dāng)消費(fèi)者創(chuàng)建失敗的時(shí)候是否繼續(xù)啟動(dòng)工程。
創(chuàng)建一個(gè)默認(rèn)的消費(fèi)者 DefaultConsumerMQ.java
@Component public class DefaultConsumerMQ extends AbstractRocketConsumer { /** * 初始化消費(fèi)者 */ @Override public void init() { // 設(shè)置主題,標(biāo)簽與消費(fèi)者標(biāo)題 super.necessary("TopicTest", "*", "這是標(biāo)題"); //消費(fèi)者具體執(zhí)行邏輯 registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { msgs.forEach(msg -> { System.out.printf("consumer message boyd %s %n", new String(msg.getBody())); }); // 標(biāo)記該消息已經(jīng)被成功消費(fèi) return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); } }
super.necessary("TopicTest", "*", "這是標(biāo)題"); 是必須要設(shè)置的,代表該消費(fèi)者監(jiān)聽TopicTest主題下所有tags,標(biāo)題那個(gè)字段是我自己定義的,所以對(duì)于該配置來說沒什么意義。
我們可以在這里注入Spring的Bean來進(jìn)行任意邏輯處理。
創(chuàng)建一個(gè)消息發(fā)送類進(jìn)行測(cè)試
@Override public String qmtest(@PathVariable("name")String name) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException { Message msg = new Message("TopicTest", "tags1", name.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 發(fā)送消息到一個(gè)Broker SendResult sendResult = defaultMQProducer.send(msg); // 通過sendResult返回消息是否成功送達(dá) System.out.printf("%s%n", sendResult); return null; }
我們來通過Http請(qǐng)求測(cè)試:
http://localhost:10001/demo/base/mq/hello consumer message boyd hello http://localhost:10001/demo/base/mq/嘿嘿嘿嘿嘿 consumer message boyd 嘿嘿嘿嘿嘿
好了到這里簡(jiǎn)單的start算是設(shè)計(jì)完成了,后面還有一些:順序消息生產(chǎn),順序消費(fèi)消息,異步消息生產(chǎn)等一系列功能,官人可參照官方去自行處理。
- ActiveMQ 沒經(jīng)過大規(guī)模吞吐量場(chǎng)景的驗(yàn)證,社區(qū)不高不活躍。
- RabbitMQ 集群動(dòng)態(tài)擴(kuò)展麻煩,且與當(dāng)前程序語言不至于難以定制化。
- kafka 支持主要的MQ功能,功能無法達(dá)到程序需求的要求,所以不使用,且與當(dāng)前程序語言不至于難以定制化。
- rocketMQ 經(jīng)過全世界的女人的洗禮,已經(jīng)很強(qiáng)大;MQ功能較為完善,還是分布式的,擴(kuò)展性好;支持復(fù)雜MQ業(yè)務(wù)場(chǎng)景。(業(yè)務(wù)復(fù)雜可做首選)
總結(jié)
以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,謝謝大家對(duì)我們的支持。
上一篇:spring-redis-session 自定義 key 和過期時(shí)間
欄 目:Java
本文標(biāo)題:Spring Boot優(yōu)雅使用RocketMQ的方法實(shí)例
本文地址:http://mengdiqiu.com.cn/a1/Java/8811.html
您可能感興趣的文章
- 01-10Springboot中@Value的使用詳解
- 01-10springboot實(shí)現(xiàn)文件上傳步驟解析
- 01-10springboot jta atomikos實(shí)現(xiàn)分布式事物管理
- 01-10SpringBoot使用RabbitMQ延時(shí)隊(duì)列(小白必備)
- 01-10如何基于SpringBoot部署外部Tomcat過程解析
- 01-10SPRING BOOT啟動(dòng)命令參數(shù)及源碼詳析
- 01-10springboot集成fastDfs過程代碼實(shí)例
- 01-10springmvc級(jí)聯(lián)屬性處理無法轉(zhuǎn)換異常問題解決
- 01-10SPRINGBOOT讀取PROPERTIES配置文件數(shù)據(jù)過程詳解
- 01-10Spring注解和同步鎖不能同步問題解決


閱讀排行
- 1C語言 while語句的用法詳解
- 2java 實(shí)現(xiàn)簡(jiǎn)單圣誕樹的示例代碼(圣誕
- 3利用C語言實(shí)現(xiàn)“百馬百擔(dān)”問題方法
- 4C語言中計(jì)算正弦的相關(guān)函數(shù)總結(jié)
- 5c語言計(jì)算三角形面積代碼
- 6什么是 WSH(腳本宿主)的詳細(xì)解釋
- 7C++ 中隨機(jī)函數(shù)random函數(shù)的使用方法
- 8正則表達(dá)式匹配各種特殊字符
- 9C語言十進(jìn)制轉(zhuǎn)二進(jìn)制代碼實(shí)例
- 10C語言查找數(shù)組里數(shù)字重復(fù)次數(shù)的方法
本欄相關(guān)
- 01-10Java實(shí)現(xiàn)動(dòng)態(tài)模擬時(shí)鐘
- 01-10Springboot中@Value的使用詳解
- 01-10JavaWeb實(shí)現(xiàn)郵件發(fā)送功能
- 01-10利用Java實(shí)現(xiàn)復(fù)制Excel工作表功能
- 01-10Java實(shí)現(xiàn)動(dòng)態(tài)數(shù)字時(shí)鐘
- 01-10java基于poi導(dǎo)出excel透視表代碼實(shí)例
- 01-10java實(shí)現(xiàn)液晶數(shù)字字體顯示當(dāng)前時(shí)間
- 01-10基于Java驗(yàn)證jwt token代碼實(shí)例
- 01-10Java動(dòng)態(tài)顯示當(dāng)前日期和時(shí)間
- 01-10淺談Java中真的只有值傳遞么
隨機(jī)閱讀
- 04-02jquery與jsp,用jquery
- 01-10delphi制作wav文件的方法
- 01-11ajax實(shí)現(xiàn)頁面的局部加載
- 01-10C#中split用法實(shí)例總結(jié)
- 08-05dedecms(織夢(mèng))副欄目數(shù)量限制代碼修改
- 01-10使用C語言求解撲克牌的順子及n個(gè)骰子
- 01-11Mac OSX 打開原生自帶讀寫NTFS功能(圖文
- 01-10SublimeText編譯C開發(fā)環(huán)境設(shè)置
- 08-05織夢(mèng)dedecms什么時(shí)候用欄目交叉功能?
- 08-05DEDE織夢(mèng)data目錄下的sessions文件夾有什