RocketMQ
RocketMQ 是阿里巴巴開源的一款分布式消息中間件,現(xiàn)已捐贈給 Apache 軟件基金會并成為頂級項目。它具有高吞吐量、高可用性、嚴(yán)格的消息順序性等特點,在眾多大型互聯(lián)網(wǎng)項目中廣泛應(yīng)用。以下從基礎(chǔ)概念、特性、應(yīng)用場景、架構(gòu)組成以及 Java 使用示例方面介紹 RocketMQ:
基礎(chǔ)概念
- 生產(chǎn)者(Producer):負(fù)責(zé)產(chǎn)生消息,將業(yè)務(wù)系統(tǒng)中的數(shù)據(jù)封裝成消息發(fā)送到 RocketMQ 服務(wù)器。生產(chǎn)者可以是各種業(yè)務(wù)應(yīng)用,比如電商系統(tǒng)中的訂單生成模塊、物流系統(tǒng)中的包裹狀態(tài)更新模塊等。
- 消費者(Consumer):負(fù)責(zé)消費消息,從 RocketMQ 服務(wù)器獲取消息并進(jìn)行業(yè)務(wù)處理。消費者可以是不同的業(yè)務(wù)邏輯,如訂單處理后的庫存更新、物流狀態(tài)變化后的通知推送等。
- 主題(Topic):是消息的邏輯分類,用于區(qū)分不同類型的消息。例如,電商系統(tǒng)中可以有 “order - topic” 用于訂單相關(guān)消息,“payment - topic” 用于支付相關(guān)消息。
- 隊列(Queue):也叫消息隊列,是 Topic 的物理分區(qū),一個 Topic 可以包含多個 Queue。通過多隊列機制,RocketMQ 可以實現(xiàn)消息的并行處理,提高消息的處理效率。
- 標(biāo)簽(Tag):對消息進(jìn)行再分類,在同一個 Topic 下,可以使用不同的 Tag 來區(qū)分不同子類型的消息。比如在 “order - topic” 中,可以有 “create - order - tag” 表示創(chuàng)建訂單消息,“cancel - order - tag” 表示取消訂單消息。
特性
- 高吞吐量:能夠支持海量消息的高并發(fā)發(fā)送和接收,適用于大數(shù)據(jù)量的實時處理場景。
- 高可用性:通過主從架構(gòu)、多副本機制等保證消息服務(wù)的高可用性,即使部分節(jié)點出現(xiàn)故障,也能確保消息不丟失且系統(tǒng)正常運行。
- 消息順序性:支持嚴(yán)格的消息順序,在一些對順序敏感的場景,如訂單處理流程,先下單消息后支付消息,能保證消息按照發(fā)送順序被消費。
- 可靠的消息投遞:提供多種消息投遞方式,如同步發(fā)送、異步發(fā)送、單向發(fā)送等,并支持消息重試機制,確保消息盡可能可靠地被投遞和處理。
應(yīng)用場景
- 異步處理:同 RabbitMQ 類似,將一些耗時的操作(如發(fā)送郵件、短信通知、復(fù)雜的業(yè)務(wù)計算等)封裝成消息發(fā)送到 RocketMQ,主線程無需等待這些操作完成,提高系統(tǒng)的響應(yīng)速度。
- 削峰填谷:在高并發(fā)場景下,如電商大促活動時,瞬間大量的請求可以先發(fā)送到 RocketMQ,后端系統(tǒng)按照自身處理能力從隊列中消費消息,避免系統(tǒng)因流量高峰而崩潰。
- 數(shù)據(jù)分發(fā):將數(shù)據(jù)從一個系統(tǒng)分發(fā)到多個不同的系統(tǒng)或模塊。例如,一個數(shù)據(jù)采集系統(tǒng)采集到數(shù)據(jù)后,通過 RocketMQ 將數(shù)據(jù)分發(fā)給數(shù)據(jù)分析模塊、數(shù)據(jù)存儲模塊等不同的下游系統(tǒng)。
架構(gòu)組成
- NameServer:是一個輕量級的元數(shù)據(jù)服務(wù),主要負(fù)責(zé)存儲和管理 Topic、Broker 等元數(shù)據(jù)信息。生產(chǎn)者和消費者通過 NameServer 獲取所需的元數(shù)據(jù),從而與相應(yīng)的 Broker 進(jìn)行通信。NameServer 可以部署多個實例,相互獨立,不存在單點故障問題。
- Broker:負(fù)責(zé)接收、存儲和轉(zhuǎn)發(fā)消息。Broker 分為 Master 和 Slave 兩種角色,Master 負(fù)責(zé)處理讀寫請求,Slave 從 Master 同步數(shù)據(jù),用于在 Master 出現(xiàn)故障時提供高可用性保障。多個 Broker 可以組成一個 Broker 集群,共同提供消息服務(wù)。
- Producer:生產(chǎn)者實例,負(fù)責(zé)將消息發(fā)送到 Broker。生產(chǎn)者與 NameServer 建立長連接,獲取 Topic 對應(yīng)的 Broker 地址列表,然后與 Broker 建立長連接進(jìn)行消息發(fā)送。
- Consumer:消費者實例,負(fù)責(zé)從 Broker 拉取消息并進(jìn)行消費。消費者同樣與 NameServer 建立長連接獲取元數(shù)據(jù),然后與 Broker 建立長連接進(jìn)行消息拉取。
Java 使用示例
以下是使用 RocketMQ 的 Java 客戶端進(jìn)行簡單消息發(fā)送和接收的示例:
- 添加依賴
在
pom.xml
中添加 RocketMQ 客戶端依賴:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq - client</artifactId>
<version>4.9.4</version>
</dependency>
- 生產(chǎn)者發(fā)送消息
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws Exception {
// 創(chuàng)建生產(chǎn)者實例
DefaultMQProducer producer = new DefaultMQProducer("example - group");
// 設(shè)置 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
// 啟動生產(chǎn)者
producer.start();
// 創(chuàng)建消息實例,指定 Topic、Tag 和消息內(nèi)容
Message message = new Message("example - topic", "example - tag", "Hello, RocketMQ!".getBytes());
// 發(fā)送消息
SendResult sendResult = producer.send(message);
System.out.println("SendResult: " + sendResult);
// 關(guān)閉生產(chǎn)者
producer.shutdown();
}
}
- 消費者接收消息
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
// 創(chuàng)建消費者實例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example - group");
// 設(shè)置 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
// 訂閱 Topic 和 Tag
consumer.subscribe("example - topic", "example - tag");
// 注冊消息監(jiān)聽器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("Received message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動消費者
consumer.start();
System.out.println("Consumer started.");
}
}
以上示例展示了如何使用 RocketMQ 的 Java 客戶端進(jìn)行基本的消息發(fā)送和接收操作。在實際應(yīng)用中,還可以根據(jù)具體需求進(jìn)行更復(fù)雜的配置和功能實現(xiàn),如事務(wù)消息、順序消息的處理等。
#??蛣?chuàng)作賞金賽#