在线观看www成人影院-在线观看www日本免费网站-在线观看www视频-在线观看操-欧美18在线-欧美1级

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

利用KoP如何將Pulsar數(shù)據(jù)快速且無(wú)縫接入Apache Doris

電子工程師 ? 來(lái)源:OSC開源社區(qū) ? 作者:OSC開源社區(qū) ? 2022-08-08 15:13 ? 次閱讀

KoP 架構(gòu)介紹

KoP 是 Kafka on Pulsar 的簡(jiǎn)寫,顧名思義就是如何在 Pulsar 上實(shí)現(xiàn)對(duì) Kafka 數(shù)據(jù)的讀寫。KoP 將 Kafka 協(xié)議處理插件引入 Pulsar Broker 來(lái)實(shí)現(xiàn) Apache Pulsar 對(duì) Apache Kafka 協(xié)議的支持。將 KoP 協(xié)議處理插件添加到現(xiàn)有 Pulsar 集群后,用戶不用修改代碼就可以將現(xiàn)有的 Kafka 應(yīng)用程序和服務(wù)遷移到 Pulsar。

Apache Pulsar 主要特點(diǎn)如下:

利用企業(yè)級(jí)多租戶特性簡(jiǎn)化運(yùn)營(yíng)。

避免數(shù)據(jù)搬遷,簡(jiǎn)化操作。

利用 Apache BookKeeper 和分層存儲(chǔ)持久保留事件流。

利用 Pulsar Functions 進(jìn)行無(wú)服務(wù)器化事件處理。

KoP 架構(gòu)如下圖,通過(guò)圖可以看到 KoP 引入一個(gè)新的協(xié)議處理插件,該協(xié)議處理插件利用 Pulsar 的現(xiàn)有組件(例如 Topic 發(fā)現(xiàn)、分布式日志庫(kù)-ManagedLedger、cursor 等)來(lái)實(shí)現(xiàn) Kafka 傳輸協(xié)議。

Routine Load 訂閱 Pulsar 數(shù)據(jù)思路

Apache Doris Routine Load 支持了將 Kafka 數(shù)據(jù)接入 Apache Doris,并保障了數(shù)據(jù)接入過(guò)程中的事務(wù)性操作。Apache Pulsar 定位為一個(gè)云原生時(shí)代企業(yè)級(jí)的消息發(fā)布和訂閱系統(tǒng),已經(jīng)在很多線上服務(wù)使用。那么 Apache Pulsar 用戶如何將數(shù)據(jù)接入 Apache Doris 呢,答案是通過(guò) KoP 實(shí)現(xiàn)。

由于 KoP 直接在 Pulsar 側(cè)提供了對(duì) Kafka 的兼容,那么對(duì)于 Apache Doris 來(lái)說(shuō)可以像使用 Kafka 一樣使用 Plusar。整個(gè)過(guò)程對(duì)于 Apache Doris 來(lái)說(shuō)無(wú)需任務(wù)改變,就能將 Pulsar 數(shù)據(jù)接入 Apache Doris,并且可以獲得 Routine Load 的事務(wù)性保障。

--------------------------

| Apache Doris |

| --------------- |

| | Routine Load | |

| --------------- |

--------------------------

|Kafka Protocol(librdkafka)

------------v--------------

| --------------- |

| | KoP | |

| --------------- |

| Apache Pulsar |

--------------------------

操作實(shí)踐

Pulsar Standalone 安裝環(huán)境準(zhǔn)備:

JDK 安裝:略

下載 Pulsar 二進(jìn)制包,并解壓:

#下載

wget https://archive.apache.org/dist/pulsar/pulsar-2.10.0/apache-pulsar-2.10.0-bin.tar.gz

#解壓并進(jìn)入安裝目錄

tar xvfz apache-pulsar-2.10.0-bin.tar.gz

cd apache-pulsar-2.10.0

組件編譯和安裝

1. 下載 KoP 源碼

git clone https://github.com/streamnative/kop.git

cd kop

2. 編譯 KoP 項(xiàng)目

mvn clean install -DskipTests

3. protocols 配置:在解壓后的 apache-pulsar 目錄下創(chuàng)建 protocols文 件夾,并把編譯好的 nar 包復(fù)制到 protocols 文件夾中。

mkdir apache-pulsar-2.10.0/protocols

# mv kop/kafka-impl/target/pulsar-protocol-handler-kafka-{{protocol:version}}.nar apache-pulsar-2.10.0/protocols

cp kop/kafka-impl/target/pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar apache-pulsar-2.10.0/protocols

4. 添加后的結(jié)果查看

[root@17a5da45700b apache-pulsar-2.10.0]# ls protocols/

pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar

KoP 配置添加

1. 在 standalone.conf 或者 broker.conf 添加如下配置

#kop適配的協(xié)議

messagingProtocols=kafka

#kop 的NAR文件路徑

protocolHandlerDirectory=。/protocols

#是否允許自動(dòng)創(chuàng)建topic

allowAutoTopicCreationType=partitioned

2. 添加如下服務(wù)監(jiān)聽配置

# Use `kafkaListeners` here for KoP 2.8.0 because `listeners` is marked as deprecated from KoP 2.8.0

kafkaListeners=PLAINTEXT://127.0.0.1:9092# This config is not required unless you want to expose another address to the Kafka client.

# If it’s not configured, it will be the same with `kafkaListeners` config by default

kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092

brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor

brokerDeleteInactiveTopicsEnabled=false

當(dāng)出現(xiàn)如下錯(cuò)誤:

java.lang.IllegalArgumentException: Broker has disabled transaction coordinator, please enable it before using transaction.

添加如下配置,開啟 transactionCoordinatorEnabled

kafkaTransactionCoordinatorEnabled=true

transactionCoordinatorEnabled=true

Pulsar 啟動(dòng)

#前臺(tái)啟動(dòng)

#bin/pulsar standalone

#后臺(tái)啟動(dòng)

pulsar-daemon start standalone

創(chuàng)建 Doris 數(shù)據(jù)庫(kù)和建表

#進(jìn)入Doris

mysql -u root -h 127.0.0.1 -P 9030

# 創(chuàng)建數(shù)據(jù)庫(kù)

create database pulsar_doris;

#切換數(shù)據(jù)庫(kù)

use pulsar_doris;

#創(chuàng)建clicklog表

CREATE TABLE IF NOT EXISTS pulsar_doris.clicklog

`clickTime` DATETIME NOT NULL COMMENT “點(diǎn)擊時(shí)間”,

`type` String NOT NULL COMMENT “點(diǎn)擊類型”,

`id` VARCHAR(100) COMMENT “唯一id”,

`user` VARCHAR(100) COMMENT “用戶名稱”,

`city` VARCHAR(50) COMMENT “所在城市”

DUPLICATE KEY(`clickTime`, `type`)

DISTRIBUTED BY HASH(`type`) BUCKETS 1

PROPERTIES (

“replication_allocation” = “tag.location.default: 1”

);

創(chuàng)建 Routine Load 任務(wù)

CREATE ROUTINE LOAD pulsar_doris.load_from_pulsar_test ON clicklog

COLUMNS(clickTime,id,type,user)

PROPERTIES

“desired_concurrent_number”=“3”,

“max_batch_interval” = “20”,

“max_batch_rows” = “300000”,

“max_batch_size” = “209715200”,

“strict_mode” = “false”,

“format” = “json”

FROM KAFKA

“kafka_broker_list” = “127.0.0.1:9092”,

“kafka_topic” = “test”,

“property.group.id” = “doris”

);

上述命令中的參數(shù)解釋如下:

pulsar_doris :Routine Load 任務(wù)所在的數(shù)據(jù)庫(kù)

load_from_pulsar_test:Routine Load 任務(wù)名稱

clicklog:Routine Load 任務(wù)的目標(biāo)表,也就是配置 Routine Load 任務(wù)將數(shù)據(jù)導(dǎo)入到 Doris 哪個(gè)表中。

strict_mode:導(dǎo)入是否為嚴(yán)格模式,這里設(shè)置為 False。

format:導(dǎo)入數(shù)據(jù)的類型,這里配置為 Json。

kafka_broker_list:Kafka Broker 服務(wù)的地址

kafka_broker_list:Kafka Topic 名稱,也就是同步哪個(gè) Topic 上的數(shù)據(jù)。

property.group.id:消費(fèi)組 ID

數(shù)據(jù)導(dǎo)入和測(cè)試

1. 數(shù)據(jù)導(dǎo)入 構(gòu)造一個(gè) ClickLog 的數(shù)據(jù)結(jié)構(gòu),并調(diào)用 Kafka 的 Producer 發(fā)送 5000 萬(wàn)條數(shù)據(jù)到 Pulsar。 ClickLog 數(shù)據(jù)結(jié)構(gòu)如下:

public class ClickLog {

private String id;

private String user;

private String city;

private String clickTime;

private String type;

。.. //省略getter和setter

}

消息構(gòu)造和發(fā)送的核心代碼邏輯如下:

String strDateFormat = “yyyy-MM-dd HHss”;

@Autowired

private Producer producer;

try {

for(int j =0 ; j《50000;j++){

int batchSize = 1000;

for(int i = 0 ; i《batchSize ;i++){

ClickLog clickLog = new ClickLog();

clickLog.setId(UUID.randomUUID().toString());

SimpleDateFormat simpleDateFormat = new SimpleDateFormat(strDateFormat);

clickLog.setClickTime(simpleDateFormat.format(new Date()));

clickLog.setType(“webset”);

clickLog.setUser(“user”+ new Random().nextInt(1000) +i);

producer.sendMessage(Constant.topicName, JSONObject.toJSONString(clickLog));

}

}

} catch (Exception e) {

e.printStackTrace();

}

2. ROUTINE LOAD 任務(wù)查看執(zhí)行 SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test G;命令,查看導(dǎo)入任務(wù)的狀態(tài)。

mysql》 SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test G;

*************************** 1. row ***************************

Id: 87873

Name: load_from_pulsar_test

CreateTime: 2022-05-31 1234

PauseTime: NULL

EndTime: NULL

DbName: default_cluster:pulsar_doris

TableName: clicklog1

State: RUNNING

DataSourceType: KAFKA

CurrentTaskNum: 1

JobProperties: {“partitions”:“*”,“columnToColumnExpr”:“clickTime,id,type,user”,“maxBatchIntervalS”:“20”,“whereExpr”:“*”,“dataFormat”:“json”,“timezone”:“Europe/London”,“send_batch_parallelism”:“1”,“precedingFilter”:“*”,“mergeType”:“APPEND”,“format”:“json”,“json_root”:“”,“maxBatchSizeBytes”:“209715200”,“exec_mem_limit”:“2147483648”,“strict_mode”:“false”,“jsonpaths”:“”,“deleteCondition”:“*”,“desireTaskConcurrentNum”:“3”,“maxErrorNum”:“0”,“strip_outer_array”:“false”,“currentTaskConcurrentNum”:“1”,“execMemLimit”:“2147483648”,“num_as_string”:“false”,“fuzzy_parse”:“false”,“maxBatchRows”:“300000”}

DataSourceProperties: {“topic”:“test”,“currentKafkaPartitions”:“0”,“brokerList”:“127.0.0.1:9092”}

CustomProperties: {“group.id”:“doris”,“kafka_default_offsets”:“OFFSET_END”,“client.id”:“doris.client”}

Statistic: {“receivedBytes”:5739001913,“runningTxns”:[],“errorRows”:0,“committedTaskNum”:168,“l(fā)oadedRows”:50000000,“l(fā)oadRowsRate”:23000,“abortedTaskNum”:1,“errorRowsAfterResumed”:0,“totalRows”:50000000,“unselectedRows”:0,“receivedBytesRate”:2675000,“taskExecuteTimeMs”:2144799}

Progress: {“0”:“51139566”}

Lag: {“0”:0}

ReasonOfStateChanged:

ErrorLogUrls:

OtherMsg:

1 row in set (0.00 sec)

ERROR:

No query specified

從上面結(jié)果可以看到 totalRows 為 50000000,errorRows 為 0。說(shuō)明數(shù)據(jù)不丟不重的導(dǎo)入 Apache Doris 了。

3. 數(shù)據(jù)統(tǒng)計(jì)驗(yàn)證執(zhí)行如下命令統(tǒng)計(jì)表中的數(shù)據(jù),發(fā)現(xiàn)統(tǒng)計(jì)的結(jié)果也是 50000000,符合預(yù)期。

mysql》 select count(*) from clicklog;

+----------+

| count(*) |

+----------+

| 50000000 |

+----------+

1 row in set (3.73 sec)

mysql》

通過(guò) KoP 我們實(shí)現(xiàn)了將 Apache Pulsar 數(shù)據(jù)無(wú)縫接入 Apache Doris ,無(wú)需對(duì) Routine Load 任務(wù)進(jìn)行任何修改,并保障了數(shù)據(jù)導(dǎo)入過(guò)程中的事務(wù)性。與此同時(shí),Apache Doris 社區(qū)已經(jīng)啟動(dòng)了 Apache Pulsar 原生導(dǎo)入支持的設(shè)計(jì),相信在不久后就可以直接訂閱 Pulsar 中的消息數(shù)據(jù),并保證數(shù)據(jù)導(dǎo)入過(guò)程中的 Exactly-Once 語(yǔ)義。

審核編輯:郭婷

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問(wèn)題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • 服務(wù)器
    +關(guān)注

    關(guān)注

    12

    文章

    9585

    瀏覽量

    86943
  • 代碼
    +關(guān)注

    關(guān)注

    30

    文章

    4876

    瀏覽量

    69964

原文標(biāo)題:如何將Pulsar數(shù)據(jù)快速且無(wú)縫接入Apache Doris

文章出處:【微信號(hào):OSC開源社區(qū),微信公眾號(hào):OSC開源社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。

收藏 人收藏

    評(píng)論

    相關(guān)推薦

    用MCP百度地圖能力輕松接入DeepSeek

    如何將百度地圖的能力接入DeepSeek。本文詳細(xì)介紹通過(guò)MCP百度地圖的能力接入DeepSeek,為用戶提供精準(zhǔn)的智能規(guī)劃服務(wù)。 一
    的頭像 發(fā)表于 03-31 11:05 ?317次閱讀
    用MCP<b class='flag-5'>將</b>百度地圖能力輕松<b class='flag-5'>接入</b>DeepSeek

    如何將Linux安裝包快速轉(zhuǎn)成玲瓏包

    本篇將以 motrix 為例為大家展示如何將 Linux 安裝包快速轉(zhuǎn)成玲瓏包。
    的頭像 發(fā)表于 03-12 16:01 ?294次閱讀
    <b class='flag-5'>如何將</b>Linux安裝包<b class='flag-5'>快速</b>轉(zhuǎn)成玲瓏包

    如何將項(xiàng)目從IAR遷移到Embedded Studio

    本文描述如何將IAR EWARM項(xiàng)目遷移到SEGGER Embedded Studio(簡(jiǎn)稱SES)中。
    的頭像 發(fā)表于 02-25 17:11 ?365次閱讀
    <b class='flag-5'>如何將</b>項(xiàng)目從IAR遷移到Embedded Studio

    如何在DevEco Studio中利用CodeGPT接入DeepSeek

    近期DeepSeek火爆全球,那一樣很火的開發(fā)鴻蒙原生應(yīng)用的DevEco Studio如果把它接入,會(huì)發(fā)生什么“化學(xué)反應(yīng)”呢?下面我們詳細(xì)分享如何在DevEco Studio中利用CodeGPT
    的頭像 發(fā)表于 02-19 13:52 ?734次閱讀
    如何在DevEco Studio中<b class='flag-5'>利用</b>CodeGPT<b class='flag-5'>接入</b>DeepSeek

    請(qǐng)問(wèn)ccs4.2如何將采集到的數(shù)據(jù)導(dǎo)出成dat文件?

    ccs4.2如何將采集到的數(shù)據(jù)導(dǎo)出成dat文件
    發(fā)表于 01-14 08:08

    EE-191:利用SHARC DSP SPORTs實(shí)現(xiàn)無(wú)縫UART

    電子發(fā)燒友網(wǎng)站提供《EE-191:利用SHARC DSP SPORTs實(shí)現(xiàn)無(wú)縫UART.pdf》資料免費(fèi)下載
    發(fā)表于 01-06 14:40 ?0次下載
    EE-191:<b class='flag-5'>利用</b>SHARC DSP SPORTs實(shí)現(xiàn)<b class='flag-5'>無(wú)縫</b>UART

    請(qǐng)問(wèn)如何將單端輸出運(yùn)放和真差分ADC連接?

    接到19位以上的精密ADC上(ADC已選定)。問(wèn)題是:高精密高位數(shù)的ADC都是真差分輸入,而偏置和失調(diào)性能好的運(yùn)放都是單端輸出。那么如何將運(yùn)放的單端輸出和真差分輸入的ADC連接起來(lái)呢? 1. 如果我
    發(fā)表于 12-06 08:33

    工業(yè)智能網(wǎng)關(guān)快速接入移動(dòng)OneNET平臺(tái)配置操作

    新基建。 在實(shí)際應(yīng)用中,很多用戶都需要將生產(chǎn)設(shè)備數(shù)據(jù)對(duì)接到移動(dòng)OneNET平臺(tái)中,享受便捷快速的配置操作體驗(yàn),同時(shí)也不需要付出購(gòu)買運(yùn)維硬件的成本精力,快速實(shí)現(xiàn)工業(yè)物聯(lián)網(wǎng)場(chǎng)景應(yīng)用。以下介紹通過(guò)物通博聯(lián)工業(yè)智能網(wǎng)關(guān)
    的頭像 發(fā)表于 11-06 17:24 ?692次閱讀
    工業(yè)智能網(wǎng)關(guān)<b class='flag-5'>快速</b><b class='flag-5'>接入</b>移動(dòng)OneNET平臺(tái)配置操作

    使用stm32f767tlv320adc3140配置為tdm工作模式,如何將每個(gè)通道數(shù)據(jù)單獨(dú)提出出來(lái)進(jìn)而播放呢?

    使用stm32f767tlv320adc3140配置為tdm工作模式,采集到了四通道差分輸入的音頻數(shù)據(jù),如何將每個(gè)通道數(shù)據(jù)單獨(dú)提出出來(lái)進(jìn)而播放呢?
    發(fā)表于 10-09 07:47

    如何將LVDS/OLDI橋接到HDMI/DVI

    電子發(fā)燒友網(wǎng)站提供《如何將LVDS/OLDI橋接到HDMI/DVI.pdf》資料免費(fèi)下載
    發(fā)表于 09-27 09:35 ?1次下載
    <b class='flag-5'>如何將</b>LVDS/OLDI橋接到HDMI/DVI

    啟明信息完成國(guó)產(chǎn)化Doris數(shù)據(jù)庫(kù)升級(jí)替代任務(wù)

    近日,隨著集團(tuán)公司監(jiān)控平臺(tái)(Elasticsearch集群)的下線,標(biāo)志著啟明信息正式完成國(guó)產(chǎn)化Doris數(shù)據(jù)庫(kù)升級(jí)替代任務(wù)。該項(xiàng)目既標(biāo)志著啟明信息信創(chuàng)升級(jí)替代邁入新臺(tái)階,同時(shí)也標(biāo)志著在Doris應(yīng)用領(lǐng)域取得自主研發(fā)新進(jìn)展。
    的頭像 發(fā)表于 09-20 09:33 ?1668次閱讀

    如何將BQ35100配置為EOS模式

    電子發(fā)燒友網(wǎng)站提供《如何將BQ35100配置為EOS模式.pdf》資料免費(fèi)下載
    發(fā)表于 09-11 10:03 ?0次下載
    <b class='flag-5'>如何將</b>BQ35100配置為EOS模式

    鋇錸Modbus轉(zhuǎn)BACnet IP網(wǎng)關(guān)如何接入BA系統(tǒng)?

    了關(guān)鍵設(shè)備!本文詳細(xì)介紹如何將Modbus轉(zhuǎn)BACnet/IP網(wǎng)關(guān)接入BA系統(tǒng),確保數(shù)據(jù)無(wú)縫集成和高效管理。 一、B
    的頭像 發(fā)表于 07-22 16:38 ?610次閱讀
    鋇錸Modbus轉(zhuǎn)BACnet IP網(wǎng)關(guān)如何<b class='flag-5'>接入</b>BA系統(tǒng)?

    涂鴉Pulsar云消息接入技巧+省錢攻略

    月末了,相信大家都會(huì)有信用卡額度超支的擔(dān)憂,生怕一不留神就會(huì)超出預(yù)算,并且事后還需要仔細(xì)核對(duì)消費(fèi)情況。類似的焦慮,也會(huì)出現(xiàn)在使用涂鴉Pulsar云消息服務(wù)時(shí)。雖然涂鴉Pulsar云消息能滿足開發(fā)者
    的頭像 發(fā)表于 06-28 08:15 ?603次閱讀
    涂鴉<b class='flag-5'>Pulsar</b>云消息<b class='flag-5'>接入</b>技巧+省錢攻略

    如何將CYKIT-028 TFT模塊與Raspberry Pi和ESP32微控制器結(jié)合使用?

    我目前正在探索如何將 CYKIT-028 TFT 模塊與 Raspberry Pi 和 ESP32 微控制器結(jié)合使用。 不過(guò),在選擇集成開發(fā)環(huán)境(IDE)和使用 PSOC Creator 的必要性
    發(fā)表于 05-21 07:36
    主站蜘蛛池模板: 岛国大片在线 | 欧美色图综合网 | 日本视频色 | 岛国午夜精品视频在线观看 | 亚洲人成电影 | 三级在线观看免播放网站 | sihu国产午夜精品一区二区三区 | 额去鲁97在线观看视频 | 国产网站大全 | 77788色淫网站免费观看 | 日成人网| 午夜毛片不卡高清免费 | 日本黄大乳片免费观看 | 天天干网站 | 国产papa | 在线观看亚洲一区二区 | 99久久99这里只有免费费精品 | 我想看三级特黄 | 激情网婷婷| 1024国产基地永久免费 | 老师别揉我胸啊嗯上课呢视频 | 直接观看黄网站免费视频 | 日韩午夜r电影在线观看 | 直接黄91麻豆网站 | 丁香婷婷社区 | 午夜精品视频在线看 | a毛片免费观看完整 | 久久精品国产免费高清 | 精品视频一区在线观看 | 成人三级影院 | 永久免费的拍拍拍网站 | 亚洲一区二区三区在线网站 | 中文字幕色| 成人a毛片手机免费播放 | 伊人99| 夜夜干天天操 | 黄色一级片视频 | 最新黄色大片 | 亚洲人成影院在线高清 | 午夜无遮挡怕怕怕免费视频 | 亚洲一区二区视频在线观看 |