欧美1区2区3区激情无套,两个女人互添下身视频在线观看,久久av无码精品人妻系列,久久精品噜噜噜成人,末发育娇小性色xxxx

RocketMQ

RocketMQ 是阿里巴巴開源的一款分布式消息中間件,現(xiàn)已捐贈給 Apache 軟件基金會并成為頂級項目。它具有高吞吐量、高可用性、嚴(yán)格的消息順序性等特點,在眾多大型互聯(lián)網(wǎng)項目中廣泛應(yīng)用。以下從基礎(chǔ)概念、特性、應(yīng)用場景、架構(gòu)組成以及 Java 使用示例方面介紹 RocketMQ:

基礎(chǔ)概念

  1. 生產(chǎn)者(Producer):負(fù)責(zé)產(chǎn)生消息,將業(yè)務(wù)系統(tǒng)中的數(shù)據(jù)封裝成消息發(fā)送到 RocketMQ 服務(wù)器。生產(chǎn)者可以是各種業(yè)務(wù)應(yīng)用,比如電商系統(tǒng)中的訂單生成模塊、物流系統(tǒng)中的包裹狀態(tài)更新模塊等。
  2. 消費者(Consumer):負(fù)責(zé)消費消息,從 RocketMQ 服務(wù)器獲取消息并進(jìn)行業(yè)務(wù)處理。消費者可以是不同的業(yè)務(wù)邏輯,如訂單處理后的庫存更新、物流狀態(tài)變化后的通知推送等。
  3. 主題(Topic):是消息的邏輯分類,用于區(qū)分不同類型的消息。例如,電商系統(tǒng)中可以有 “order - topic” 用于訂單相關(guān)消息,“payment - topic” 用于支付相關(guān)消息。
  4. 隊列(Queue):也叫消息隊列,是 Topic 的物理分區(qū),一個 Topic 可以包含多個 Queue。通過多隊列機制,RocketMQ 可以實現(xiàn)消息的并行處理,提高消息的處理效率。
  5. 標(biāo)簽(Tag):對消息進(jìn)行再分類,在同一個 Topic 下,可以使用不同的 Tag 來區(qū)分不同子類型的消息。比如在 “order - topic” 中,可以有 “create - order - tag” 表示創(chuàng)建訂單消息,“cancel - order - tag” 表示取消訂單消息。

特性

  1. 高吞吐量:能夠支持海量消息的高并發(fā)發(fā)送和接收,適用于大數(shù)據(jù)量的實時處理場景。
  2. 高可用性:通過主從架構(gòu)、多副本機制等保證消息服務(wù)的高可用性,即使部分節(jié)點出現(xiàn)故障,也能確保消息不丟失且系統(tǒng)正常運行。
  3. 消息順序性:支持嚴(yán)格的消息順序,在一些對順序敏感的場景,如訂單處理流程,先下單消息后支付消息,能保證消息按照發(fā)送順序被消費。
  4. 可靠的消息投遞:提供多種消息投遞方式,如同步發(fā)送、異步發(fā)送、單向發(fā)送等,并支持消息重試機制,確保消息盡可能可靠地被投遞和處理。

應(yīng)用場景

  1. 異步處理:同 RabbitMQ 類似,將一些耗時的操作(如發(fā)送郵件、短信通知、復(fù)雜的業(yè)務(wù)計算等)封裝成消息發(fā)送到 RocketMQ,主線程無需等待這些操作完成,提高系統(tǒng)的響應(yīng)速度。
  2. 削峰填谷:在高并發(fā)場景下,如電商大促活動時,瞬間大量的請求可以先發(fā)送到 RocketMQ,后端系統(tǒng)按照自身處理能力從隊列中消費消息,避免系統(tǒng)因流量高峰而崩潰。
  3. 數(shù)據(jù)分發(fā):將數(shù)據(jù)從一個系統(tǒng)分發(fā)到多個不同的系統(tǒng)或模塊。例如,一個數(shù)據(jù)采集系統(tǒng)采集到數(shù)據(jù)后,通過 RocketMQ 將數(shù)據(jù)分發(fā)給數(shù)據(jù)分析模塊、數(shù)據(jù)存儲模塊等不同的下游系統(tǒng)。

架構(gòu)組成

  1. NameServer:是一個輕量級的元數(shù)據(jù)服務(wù),主要負(fù)責(zé)存儲和管理 Topic、Broker 等元數(shù)據(jù)信息。生產(chǎn)者和消費者通過 NameServer 獲取所需的元數(shù)據(jù),從而與相應(yīng)的 Broker 進(jìn)行通信。NameServer 可以部署多個實例,相互獨立,不存在單點故障問題。
  2. Broker:負(fù)責(zé)接收、存儲和轉(zhuǎn)發(fā)消息。Broker 分為 Master 和 Slave 兩種角色,Master 負(fù)責(zé)處理讀寫請求,Slave 從 Master 同步數(shù)據(jù),用于在 Master 出現(xiàn)故障時提供高可用性保障。多個 Broker 可以組成一個 Broker 集群,共同提供消息服務(wù)。
  3. Producer:生產(chǎn)者實例,負(fù)責(zé)將消息發(fā)送到 Broker。生產(chǎn)者與 NameServer 建立長連接,獲取 Topic 對應(yīng)的 Broker 地址列表,然后與 Broker 建立長連接進(jìn)行消息發(fā)送。
  4. Consumer:消費者實例,負(fù)責(zé)從 Broker 拉取消息并進(jìn)行消費。消費者同樣與 NameServer 建立長連接獲取元數(shù)據(jù),然后與 Broker 建立長連接進(jìn)行消息拉取。

Java 使用示例

以下是使用 RocketMQ 的 Java 客戶端進(jìn)行簡單消息發(fā)送和接收的示例:

  1. 添加依賴pom.xml 中添加 RocketMQ 客戶端依賴:
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq - client</artifactId>
    <version>4.9.4</version>
</dependency>
  1. 生產(chǎn)者發(fā)送消息
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args) throws Exception {
        // 創(chuàng)建生產(chǎn)者實例
        DefaultMQProducer producer = new DefaultMQProducer("example - group");
        // 設(shè)置 NameServer 地址
        producer.setNamesrvAddr("localhost:9876");
        // 啟動生產(chǎn)者
        producer.start();

        // 創(chuàng)建消息實例,指定 Topic、Tag 和消息內(nèi)容
        Message message = new Message("example - topic", "example - tag", "Hello, RocketMQ!".getBytes());
        // 發(fā)送消息
        SendResult sendResult = producer.send(message);
        System.out.println("SendResult: " + sendResult);

        // 關(guān)閉生產(chǎn)者
        producer.shutdown();
    }
}
  1. 消費者接收消息
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 創(chuàng)建消費者實例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example - group");
        // 設(shè)置 NameServer 地址
        consumer.setNamesrvAddr("localhost:9876");
        // 訂閱 Topic 和 Tag
        consumer.subscribe("example - topic", "example - tag");

        // 注冊消息監(jiān)聽器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("Received message: " + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 啟動消費者
        consumer.start();
        System.out.println("Consumer started.");
    }
}

以上示例展示了如何使用 RocketMQ 的 Java 客戶端進(jìn)行基本的消息發(fā)送和接收操作。在實際應(yīng)用中,還可以根據(jù)具體需求進(jìn)行更復(fù)雜的配置和功能實現(xiàn),如事務(wù)消息、順序消息的處理等。

#??蛣?chuàng)作賞金賽#
全部評論

相關(guān)推薦

評論
點贊
1
分享

創(chuàng)作者周榜

更多
牛客網(wǎng)
??推髽I(yè)服務(wù)