在线观看www成人影院-在线观看www日本免费网站-在线观看www视频-在线观看操-欧美18在线-欧美1级

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

分析Hive與Spark分區(qū)策略的異同點(diǎn)

OSC開源社區(qū) ? 來源:vivo互聯(lián)網(wǎng)技術(shù) ? 2023-04-11 09:03 ? 次閱讀

隨著技術(shù)的不斷的發(fā)展,大數(shù)據(jù)領(lǐng)域?qū)τ诤A繑?shù)據(jù)的存儲(chǔ)和處理的技術(shù)框架越來越多。在離線數(shù)據(jù)處理生態(tài)系統(tǒng)最具代表性的分布式處理引擎當(dāng)屬Hive和Spark,它們?cè)诜謪^(qū)策略方面有著一些相似之處,但也存在一些不同之處。

一、概述

隨著技術(shù)的不斷的發(fā)展,大數(shù)據(jù)領(lǐng)域?qū)τ诤A繑?shù)據(jù)的存儲(chǔ)和處理的技術(shù)框架越來越多。在離線數(shù)據(jù)處理生態(tài)系統(tǒng)最具代表性的分布式處理引擎當(dāng)屬Hive和Spark,它們?cè)诜謪^(qū)策略方面有著一些相似之處,但也存在一些不同之處。本篇文章將分析Hive與Spark分區(qū)策略的異同點(diǎn)、它們各自的優(yōu)缺點(diǎn),以及一些優(yōu)化措施。

二、Hive和Spark分區(qū)概念

在了解Hive和Spark分區(qū)內(nèi)容之前,首先,我們先來回顧一下Hive和Spark的分區(qū)概念。在Hive中,分區(qū)是指將表中的數(shù)據(jù)劃分為不同的目錄或者子目錄,這些目錄或子目錄的名稱通常與表的列名相關(guān)聯(lián)。比如,一個(gè)名為“t_orders_name”的表可以按照日期分為多個(gè)目錄,每個(gè)目錄名稱對(duì)應(yīng)一個(gè)日期值。這樣做的好處是可以大大提高查詢效率,因?yàn)橹挥猩婕暗教囟ㄈ掌诘牟樵儾判枰獟呙鑼?duì)應(yīng)的目錄,而不需要去掃描整個(gè)表。Spark的分區(qū)概念與Hive類似,但是有一些不同之處,我們將在后文中進(jìn)行討論。

在Hive中,分區(qū)可以基于多個(gè)列進(jìn)行,這些列的值組合形成目錄名稱。例如,如果我們將“t_orders_name”表按照日期和地區(qū)分區(qū),那么目錄的名稱將包含日期和地區(qū)值的組合。在Hive中,數(shù)據(jù)存儲(chǔ)在分區(qū)的目錄下,而不是存儲(chǔ)在表的目錄下。這使得Hive可以快速訪問需要的數(shù)據(jù),而不必掃描整個(gè)表。另外,Hive的分區(qū)概念也可以用于數(shù)據(jù)分桶,分桶是將表中的數(shù)據(jù)劃分為固定數(shù)量的桶,每個(gè)桶包含相同的行。

而與Hive不同的是,Spark的分區(qū)是將數(shù)據(jù)分成小塊以便并行計(jì)算處理。在Spark中,分區(qū)的數(shù)量由Spark執(zhí)行引擎根據(jù)數(shù)據(jù)大小和硬件資源自動(dòng)計(jì)算得出。Spark的分區(qū)數(shù)越多,可以并行處理的數(shù)據(jù)也就越多,因此也能更快的完成計(jì)算任務(wù)。但是,如果分區(qū)數(shù)太多,將會(huì)導(dǎo)致過多的任務(wù)調(diào)度和數(shù)據(jù)傳輸開銷,從而降低整體的性能。因此,Spark分區(qū)數(shù)的選擇應(yīng)該考慮數(shù)據(jù)大小、硬件資源和計(jì)算任務(wù)復(fù)雜度等因素。

三、Hive和Spark分區(qū)的應(yīng)用場(chǎng)景

在了解Hive和Spark的分區(qū)概念之后,接下來,我們來看看Hive和Spark分區(qū)在不同的應(yīng)用場(chǎng)景中有哪些不同的優(yōu)勢(shì)。

3.1 Hive分區(qū)

Hive分區(qū)適用于大數(shù)據(jù)場(chǎng)景,可以對(duì)數(shù)據(jù)進(jìn)行多級(jí)分區(qū),以便更細(xì)粒度地劃分?jǐn)?shù)據(jù),提高查詢效率。例如,在游戲平臺(tái)的充值數(shù)據(jù)中,可以按照道具購(gòu)買日期、道具付款狀態(tài)、游戲用戶ID等多個(gè)維度進(jìn)行分區(qū)。這樣可以方便的進(jìn)行數(shù)據(jù)統(tǒng)計(jì)、分析和查詢操作,同時(shí)避免單一分區(qū)數(shù)據(jù)過大導(dǎo)致的性能問題。

3.2 Spark分區(qū)

Spark分區(qū)適用于大規(guī)模數(shù)據(jù)處理場(chǎng)景,可以充分利用集群資源進(jìn)行并行計(jì)算處理。比如,在機(jī)器學(xué)習(xí)算法的訓(xùn)練過程中,可以將大量數(shù)據(jù)進(jìn)行分區(qū),然后并行處理每個(gè)分區(qū)的數(shù)據(jù),從而提高算法的訓(xùn)練速度和效率。另外,Spark的分布式計(jì)算引擎也可以支持在多個(gè)節(jié)點(diǎn)上進(jìn)行數(shù)據(jù)分區(qū)和計(jì)算,從而提高整個(gè)集群的計(jì)算能力和效率。

簡(jiǎn)而言之,Hive和Spark分區(qū)在大數(shù)據(jù)處理和分布式計(jì)算場(chǎng)景這都有廣泛的應(yīng)用,可以通過選擇合適的分區(qū)策略和優(yōu)化措施,進(jìn)一步提高數(shù)據(jù)處理的效率和性能。

四、如何選擇分區(qū)策略

在熟悉了Hive和Spark的分區(qū)概念以及應(yīng)用場(chǎng)景后。接下來,我們來看看在Hive和Spark中如何選擇分區(qū)策略。分區(qū)策略的選擇對(duì)數(shù)據(jù)處理的效率和性能有著重要的影響。下面將分別闡述Hive和Spark分區(qū)策略的優(yōu)缺點(diǎn)以及如何選擇分區(qū)策略。

4.1 Hive分區(qū)策略

優(yōu)點(diǎn):

Hive的分區(qū)策略可以提高查詢效率和數(shù)據(jù)處理性能,特別是在大數(shù)據(jù)集上表現(xiàn)突出。另外,Hive還支持多級(jí)分區(qū),允許更細(xì)粒度的數(shù)據(jù)劃分。

缺點(diǎn):

在Hive中,分區(qū)是以目錄的形式存在的,這會(huì)導(dǎo)致大量的目錄和子目錄,如果分區(qū)過多,將會(huì)占用過多的存儲(chǔ)空間。此外,Hive的分區(qū)策略需要在創(chuàng)建表時(shí)進(jìn)行設(shè)置,如果數(shù)據(jù)分布出現(xiàn)變化,需要重新設(shè)置分區(qū)策略。

4.2 Spark分區(qū)策略

優(yōu)點(diǎn):

Spark的分區(qū)策略可以根據(jù)數(shù)據(jù)大小和硬件資源自動(dòng)計(jì)算分區(qū)數(shù),這使得計(jì)算任務(wù)可以并行計(jì)算處理,從而提高了處理效率和性能。

缺點(diǎn):

如果分區(qū)數(shù)設(shè)置不當(dāng),將會(huì)導(dǎo)致過多的任務(wù)調(diào)度和數(shù)據(jù)傳輸開銷,從而影響整體性能。此外,Spark的分區(qū)策略也需要根據(jù)數(shù)據(jù)大小、硬件資源和計(jì)算任務(wù)復(fù)雜度等因素進(jìn)行調(diào)整。

4.3 分區(qū)策略選擇

在實(shí)際項(xiàng)目開發(fā)使用中,選擇合適的分區(qū)策略可以顯著提高數(shù)據(jù)處理的效率和性能。但是,如何選擇分區(qū)策略需要根據(jù)具體情況進(jìn)行考慮,這里總結(jié)了一些分區(qū)策略選擇的場(chǎng)景:

數(shù)據(jù)集大小:如果數(shù)據(jù)集較大,可以考慮使用Hive的多級(jí)劃分策略,以便更細(xì)粒度的劃分?jǐn)?shù)據(jù),提高查詢效率。如果數(shù)據(jù)集較小,可以使用Spark自動(dòng)計(jì)算分區(qū)策略,以便充分利用硬件資源并提高計(jì)算效率。

計(jì)算任務(wù)復(fù)雜度:如果計(jì)算任務(wù)比較復(fù)雜,例如需要進(jìn)行多個(gè)JOIN操作,可以使用Hive的分桶策略,以便加快數(shù)據(jù)訪問速度,減少JOIN操作的開銷。

硬件資源:分區(qū)策略的選擇也需要考慮硬件資源的限制。如果硬件資源比較充足,可以增加分區(qū)數(shù)以提高計(jì)算效率。如果硬件資源比較緊張,需要減少分區(qū)數(shù)以避免任務(wù)調(diào)度和數(shù)據(jù)傳輸?shù)拈_銷。

綜上所述,選擇合適的分區(qū)策略需要根據(jù)具體的情況進(jìn)行考慮,包括數(shù)據(jù)集大小、計(jì)算任務(wù)復(fù)雜度和硬件資源等因素。在實(shí)際使用中,可以通過實(shí)驗(yàn)和調(diào)試來找到最佳的分區(qū)策略。

五、如何優(yōu)化分區(qū)性能

除了選擇合適的分區(qū)策略之外,還可以通過一些優(yōu)化措施來進(jìn)一步提高分區(qū)的性能。在Spark中,大多數(shù)的Spark任務(wù)可以通過三個(gè)階段來表述,它們分別是讀取輸入數(shù)據(jù)、使用Spark處理、保持輸出數(shù)據(jù)。Spark雖然實(shí)際數(shù)據(jù)處理主要發(fā)生在內(nèi)存中,但是Spark使用的是存儲(chǔ)在HDFS上的數(shù)據(jù)來作為輸入和輸出,任務(wù)的調(diào)度執(zhí)行會(huì)使用大量的 I/O,存在性能瓶頸。

而Hive分區(qū)數(shù)據(jù)是存儲(chǔ)在HDFS上的,然而HDFS對(duì)于大量小文件支持不太友好,因?yàn)樵诿總€(gè)NameNode內(nèi)存中每個(gè)文件大概有150字節(jié)的存儲(chǔ)開銷,而整個(gè)HDFS集群的IOPS數(shù)量是有上限的。當(dāng)文件寫入達(dá)到峰值時(shí),會(huì)對(duì)HDFS集群的基礎(chǔ)架構(gòu)的某些部分產(chǎn)生性能瓶頸。

5.1 通過減少 I/O 帶寬來優(yōu)化性能

在Hadoop集群中,它依靠大規(guī)模并行 I/O 來支持?jǐn)?shù)千個(gè)并發(fā)任務(wù)。比如現(xiàn)有一個(gè)大小為96TB的數(shù)據(jù)節(jié)點(diǎn),磁盤的大小有兩種,它們分別是8TB和16TB。具有8TB磁盤的數(shù)據(jù)節(jié)點(diǎn)有12塊這樣的磁盤,而具有16TB磁盤的數(shù)據(jù)節(jié)點(diǎn)有6塊這樣的磁盤。我們可以假設(shè)每個(gè)磁盤的平均讀寫吞吐量約為100MB/s,而這兩種不同的磁盤分布,它們對(duì)應(yīng)的帶寬和IOPS,具體詳情如下表所示:

ea560c18-d7fb-11ed-bfe3-dac502259ad0.jpg

5.2 通過設(shè)置參數(shù)來優(yōu)化性能

在Hadoop集群中,每個(gè)數(shù)據(jù)節(jié)點(diǎn)為每個(gè)卷運(yùn)行一個(gè)卷掃描器,用于掃描塊的狀態(tài)。由于卷掃描器與應(yīng)用程序競(jìng)爭(zhēng)磁盤資源,因此限制其磁盤帶寬很重要。配置 dfs.block.scanner.volume.bytes.per.second 屬性值來定義卷掃描器每秒可以掃描的字節(jié)數(shù),默認(rèn)為1MB/s。

比如設(shè)置帶寬為5MB/s,掃描12TB所需要的時(shí)間為

12TB / 5MBps = (12 * 1024 * 1024 / (3600 * 24)) = 29.13天。

5.3 通過優(yōu)化Spark處理分區(qū)任務(wù)來提升性能

假如,現(xiàn)在需要重新計(jì)算歷史分區(qū)的數(shù)據(jù)表,這種場(chǎng)景通常用于修復(fù)錯(cuò)誤或者數(shù)據(jù)質(zhì)量問題。在處理包含一年數(shù)據(jù)的大型數(shù)據(jù)集(比如1TB以上)時(shí),可能會(huì)將數(shù)據(jù)分成幾千個(gè)Spark分區(qū)來進(jìn)行處理。雖然,從表面上看,這種處理方法并不是最合適的,使用動(dòng)態(tài)分區(qū)并將數(shù)據(jù)結(jié)果寫入按照日期分區(qū)的Hive表中將產(chǎn)生多達(dá)上百萬個(gè)文件。

下面,我們將任務(wù)分區(qū)數(shù)縮小,現(xiàn)有一個(gè)包含3個(gè)分區(qū)的Spark任務(wù),并且想將數(shù)據(jù)寫入到包含3個(gè)分區(qū)的Hive表。在這種情況下,希望發(fā)送的是將3個(gè)文件寫入到HDFS中,所有數(shù)據(jù)都存儲(chǔ)在每個(gè)分區(qū)的單個(gè)文件中。最終會(huì)生成9個(gè)文件,并且每個(gè)文件都有1個(gè)記錄。使用動(dòng)態(tài)分區(qū)寫入Hive表時(shí),每個(gè)Spark分區(qū)都由執(zhí)行程序來并行處理。

處理Spark分區(qū)數(shù)據(jù)時(shí),每次執(zhí)行程序在給定的Spark分區(qū)中遇到新的分區(qū)時(shí),它都會(huì)打開一個(gè)新文件。默認(rèn)情況下,Spark對(duì)數(shù)據(jù)會(huì)使用Hash或者Round Robin分區(qū)器。當(dāng)應(yīng)用于任意數(shù)據(jù)時(shí),可以假設(shè)這兩種方法在整個(gè)Spark分區(qū)中相對(duì)均勻且隨機(jī)分布數(shù)據(jù)。如下圖所示:

ea67cafc-d7fb-11ed-bfe3-dac502259ad0.png

理想情況下,目標(biāo)文件大小應(yīng)該大約是HDFS塊大小的倍數(shù),默認(rèn)情況下是128MB。在Hive中,提供了一些配置參數(shù)來自動(dòng)將結(jié)果寫入到合理大小的文件中,從開發(fā)者的角度來看幾乎是透明的,比如設(shè)置屬性 hive.merge.smallfiles.avgsize 和

hive.merge.size.per.task 。但是,Spark中不存在此類功能,因此,我們需要自己開發(fā)實(shí)現(xiàn),來確定一個(gè)數(shù)據(jù)集,應(yīng)該寫入多少文件。

5.3.1 基于大小的計(jì)算

理論上,這是最直接的方法,設(shè)置目標(biāo)大小,估算數(shù)據(jù)的大小,然后進(jìn)行劃分。但是,在很多情況下,文件被寫入磁盤時(shí)會(huì)進(jìn)行壓縮,并且其格式與存儲(chǔ)在 Java 堆中的記錄格式有所不同。這意味著估算寫入磁盤時(shí)內(nèi)存的記錄大小不是一件容易的事情。雖然可以使用 Spark SizeEstimator應(yīng)用程序通過內(nèi)存中的數(shù)據(jù)的大小進(jìn)行估算。但是,SizeEstimator會(huì)考慮數(shù)據(jù)幀、數(shù)據(jù)集的內(nèi)部消耗,以及數(shù)據(jù)的大小。總體來說,這種方式不太容易準(zhǔn)確實(shí)現(xiàn)。

5.3.2 基于行數(shù)的計(jì)算

這種方法是設(shè)置目標(biāo)行數(shù),計(jì)算數(shù)據(jù)集的大小,然后執(zhí)行除法來估算目標(biāo)。我們的目標(biāo)行數(shù)可以通過多種方式確定,或者通過為所有數(shù)據(jù)集選擇一個(gè)靜態(tài)數(shù)字,或者通過確定磁盤上單個(gè)記錄的大小并執(zhí)行必要的計(jì)算。哪種方式最優(yōu),取決于你的數(shù)據(jù)集數(shù)量及其復(fù)雜性。計(jì)算相對(duì)來說成本較低,但是需要在計(jì)算前緩存以避免重新計(jì)算數(shù)據(jù)集。

5.3.3 靜態(tài)文件計(jì)算

最簡(jiǎn)單的解決方案是,只要求開發(fā)者在每個(gè)寫入任務(wù)的基礎(chǔ)上,告訴Spark總共應(yīng)該寫入多少個(gè)文件。這種方式需要給開發(fā)者一些其他方法來獲取具體的數(shù)字,可以通過這種方式來替代昂貴的計(jì)算。

5.4. 優(yōu)化Spark分發(fā)數(shù)據(jù)方式來提升性能

即使我們知道了如何將文件寫入磁盤,但是,我們?nèi)皂氉孲park以符合實(shí)際的方式來構(gòu)建我們的分區(qū)。在Spark中,它提供了許多工具來確定數(shù)據(jù)在整個(gè)分區(qū)中的分布方式。但是,各種功能中隱藏著很多復(fù)雜性,在某些情況下,它們的含義并不明顯,下面將介紹Spark提供的一些選項(xiàng)來控制Spark輸出文件的數(shù)量。

5.4.1 合并

Spark Coalesce是一個(gè)特殊版本的重新分區(qū),它只允許減少總的分區(qū),但是不需要完全的Shuffle,因此比重新分區(qū)要快得多。它通過有效的合并分區(qū)來實(shí)現(xiàn)這一點(diǎn)。如下圖所示:

ea7a918c-d7fb-11ed-bfe3-dac502259ad0.png

Coalesce在某些情況下看起來是不錯(cuò)的,但是也有一些問題。首先,Coalesce有一個(gè)難以使用的行為,以一個(gè)非常基礎(chǔ)的Spark應(yīng)用程序?yàn)槔a如下所示:

Spark

load().map(…).filter(…).save()

比如,設(shè)置的并行度為1000,但是最終只想寫入10個(gè)文件,可以設(shè)置如下:

Spark

load().map(…).filter(…).coalesce(10).save()

但是,Spark會(huì)盡可能早的有效的將合并操作下推,因此這將執(zhí)行為如下代碼:

Spark

load().coalesce(10).map(…).filter(…).save()

有效的解決這種問題的方法是在轉(zhuǎn)換和合并之間強(qiáng)制執(zhí)行,代碼如下所示:

Spark

val df = load().map(…).filter(…).cache()
df.count()
df.coalesce(10)

在Spark中,緩存是必須的,否則,你將不得不重新計(jì)算數(shù)據(jù),這可能會(huì)重新消耗計(jì)算資源。然后,緩存是需要消費(fèi)一定資源的,如果你的數(shù)據(jù)集無法放入內(nèi)存中,或者無法釋放內(nèi)存,將數(shù)據(jù)有效的存儲(chǔ)在內(nèi)存中兩次,那么必須使用磁盤緩存,這有其自身的局限性和顯著的性能損失。

此外,正如我們看到的,通常需要執(zhí)行Shuffle來獲得我們想要的更復(fù)雜的數(shù)據(jù)集結(jié)果。因此,Coalesce僅適用于特定的情況,比如如下場(chǎng)景:

保證只寫入一個(gè)Hive分區(qū);

目標(biāo)文件數(shù)少于你用于處理數(shù)據(jù)的Spark分區(qū)數(shù);

有充足的緩存資源。

5.4.2 簡(jiǎn)單重新分區(qū)

在Spark中,一個(gè)簡(jiǎn)單的重新分區(qū),可以通過設(shè)置參數(shù)來實(shí)現(xiàn),比如df.repartition(100)。在這種情況下,使用循環(huán)分區(qū)器,這意味著唯一的保證是輸出數(shù)據(jù)具有大致相同大小的Spark分區(qū),這種分區(qū)僅適用于以下情況:

保證只需要寫入一個(gè)Hive分區(qū);

正在寫入的文件數(shù)大于你的Spark分區(qū)數(shù),或者由于某些原因你無法使用合并。

5.4.3 按列重新分區(qū)

按列重新分區(qū)接收目標(biāo)Spark分區(qū)計(jì)數(shù),以及要重新分區(qū)的列序列,例如,df.repartition(100,$"date")。這對(duì)于強(qiáng)制要求Spark將具有相同鍵的數(shù)據(jù),分發(fā)到同一個(gè)分區(qū)很有用。一般來說,這對(duì)許多Spark操作(比如JOIN)很有用。

按列重新分區(qū)使用HashPartitioner,將具有相同值的數(shù)據(jù),分發(fā)給同一個(gè)分區(qū),實(shí)際上,它將執(zhí)行以下操作:

ea8c8fe0-d7fb-11ed-bfe3-dac502259ad0.png

但是,這種方法只有在每個(gè)分區(qū)鍵都可以安全的寫入到一個(gè)文件時(shí)才有效。這是因?yàn)闊o論有多少特定的Hash值,它們最終都會(huì)在同一個(gè)分區(qū)中。按列重新分區(qū)僅在你寫入一個(gè)或者多個(gè)小的Hive分區(qū)時(shí)才有效。在任何其他情況下,它都是無效的,因?yàn)槊總€(gè)Hive分區(qū)最終都會(huì)生成一個(gè)文件,僅適用于最小的數(shù)據(jù)集。

5.4.4 按具有隨機(jī)因子的列重新分區(qū)

我們可以通過添加約束的隨機(jī)因子來按列修改重新分區(qū),具體代碼如下:

Spark

df
.withColumn("rand", rand() % filesPerPartitionKey)
.repartition(100, $"key", $"rand")

理論上,只要滿足以下條件,這種方法應(yīng)該會(huì)產(chǎn)生排序規(guī)則的數(shù)據(jù)和大小均勻的文件:

Hive分區(qū)的大小大致相同;

知道每個(gè)Hive分區(qū)的目標(biāo)文件數(shù)并且可以在運(yùn)行時(shí)對(duì)其進(jìn)行編碼。

但是,即使我們滿足上述這些條件,還有另外一個(gè)問題:散列沖突。假設(shè),現(xiàn)在正在處理一年的數(shù)據(jù),日期作為分區(qū)的唯一鍵。如果每個(gè)分區(qū)需要5個(gè)文件,可以執(zhí)行如下代碼操作:

Spark

df.withColumn("rand", rand() % 5).repartition(5*365, $"date", $"rand")

在后臺(tái),Scala將構(gòu)造一個(gè)包含日期和隨機(jī)因子的鍵,例如(,<0-4>)。然后,如果我們查看HashPartitioner代碼,可以發(fā)現(xiàn)它將執(zhí)行以下操作:

Spark

class HashPartitioner(partitions: Int) extends Partitioner {
    def getPartition(key: Any): Int = key match {
        case null => 0
        case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
    }
}

實(shí)際上,這里面所做的事情,就是獲取關(guān)鍵元組的散列,然后使用目標(biāo)數(shù)量的Spark分區(qū)獲取它的mod。我們可以分析一下在這種情況下我們的數(shù)據(jù)將如何實(shí)現(xiàn)分布,具體代碼如下:

Spark

import java.time.LocalDate
 
def hashCodeTuple(one: String, two: Int, mod: Int): Int = {
 val rawMod = (one, two).hashCode % mod
 rawMod + (if (rawMod < 0) mod else 0)
}
def hashCodeSeq(one: String, two: Int, mod: Int): Int = {
 val rawMod = Seq(one, two).hashCode % mod
 rawMod + (if (rawMod < 0) mod else 0)
}
 
def iteration(numberDS: Int, filesPerPartition: Int): (Double, Double, Double) = {
  val hashedRandKeys = (0 to numberDS - 1).map(x => LocalDate.of(2019, 1, 1).plusDays(x)).flatMap(
    x => (0 to filesPerPartition - 1).map(y => hashCodeTuple(x.toString, y, filesPerPartition*numberDS))
  )
 
  hashedRandKeys.size // Number of unique keys, with the random factor
 
  val groupedHashedKeys = hashedRandKeys.groupBy(identity).view.mapValues(_.size).toSeq
 
  groupedHashedKeys.size // number of actual sPartitions used
 
  val sortedKeyCollisions = groupedHashedKeys.filter(_._2 != 1).sortBy(_._2).reverse
   
  val sortedSevereKeyCollisions = groupedHashedKeys.filter(_._2 > 2).sortBy(_._2).reverse
 
  sortedKeyCollisions.size // number of sPartitions with a hashing collision
 
  // (collisions, occurences)
  val collisionCounts = sortedKeyCollisions.map(_._2).groupBy(identity).view.mapValues(_.size).toSeq.sortBy(_._2).reverse
   
  (
    groupedHashedKeys.size.toDouble / hashedRandKeys.size.toDouble,
    sortedKeyCollisions.size.toDouble / groupedHashedKeys.size.toDouble,
  sortedSevereKeyCollisions.size.toDouble / groupedHashedKeys.size.toDouble
  )
}
 
val results = Seq(
  iteration(365, 1),
  iteration(365, 5),
  iteration(365, 10),
  iteration(365, 100),
  iteration(365 * 2, 100),
  iteration(365 * 5, 100),
  iteration(365 * 10, 100)
)
 
val avgEfficiency = results.map(_._1).sum / results.length
val avgCollisionRate = results.map(_._2).sum / results.length
val avgSevereCollisionRate = results.map(_._3).sum / results.length
 
(avgEfficiency, avgCollisionRate, avgSevereCollisionRate) // 63.2%, 42%, 12.6%

上面的腳本計(jì)算了3個(gè)數(shù)量:

效率:非空的Spark分區(qū)與輸出文件數(shù)量的比率;

碰撞率:(date,rand)的Hash值發(fā)送沖突的Spark分區(qū)的百分比;

嚴(yán)重沖突率:同上,但是此鍵上的沖突次數(shù)為3或者更多。

沖突很重要,因?yàn)樗鼈円馕吨覀兊腟park分區(qū)包含多個(gè)唯一的分區(qū)鍵,而我們預(yù)計(jì)每個(gè)Spark分區(qū)只有1個(gè)。我們從分析的結(jié)果可知,我們使用了63%的執(zhí)行器,并且可能會(huì)出現(xiàn)嚴(yán)重的偏差,我們將近一半的執(zhí)行正在處理比預(yù)期多2到3倍或者在某些情況下高達(dá)8倍的數(shù)據(jù)。

現(xiàn)在,有一個(gè)解決方法,即分區(qū)縮放。在之前示例中,輸出的Spark分區(qū)數(shù)量等于預(yù)期的總文件數(shù)。如果將N個(gè)對(duì)象隨機(jī)分配給N個(gè)插槽,可以預(yù)期會(huì)有多個(gè)插槽包含多個(gè)對(duì)象,并且有幾個(gè)空插槽。因此,需要解決此問題,必須要降低對(duì)象與插槽的比率。

我們通過縮放輸出分區(qū)計(jì)數(shù)來實(shí)現(xiàn)這一點(diǎn),通過將輸出Spark分區(qū)數(shù)乘以一個(gè)大因子,類似于:

Spark

df
.withColumn("rand", rand() % 5)
.repartition(5*365*SCALING_FACTOR, $"date", $"rand")

具體分析代碼如下所示:

Spark

import java.time.LocalDate
 
def hashCodeTuple(one: String, two: Int, mod: Int): Int = {
 val rawMod = (one, two).hashCode % mod
 rawMod + (if (rawMod < 0) mod else 0)
}
 
def hashCodeSeq(one: String, two: Int, mod: Int): Int = {
 val rawMod = Seq(one, two).hashCode % mod
 rawMod + (if (rawMod < 0) mod else 0)
}
 
def iteration(numberDS: Int, filesPerPartition: Int, partitionFactor: Int = 1): (Double, Double, Double, Double) = {
  val partitionCount = filesPerPartition*numberDS * partitionFactor
  val hashedRandKeys = (0 to numberDS - 1).map(x => LocalDate.of(2019, 1, 1).plusDays(x)).flatMap(
    x => (0 to filesPerPartition - 1).map(y => hashCodeTuple(x.toString, y, partitionCount))
  )
   
  hashedRandKeys.size // Number of unique keys, with the random factor
 
  val groupedHashedKeys = hashedRandKeys.groupBy(identity).view.mapValues(_.size).toSeq
 
  groupedHashedKeys.size // number of unique hashes - and thus, sPartitions with > 0 records
   
  val sortedKeyCollisions = groupedHashedKeys.filter(_._2 != 1).sortBy(_._2).reverse
   
  val sortedSevereKeyCollisions = groupedHashedKeys.filter(_._2 > 2).sortBy(_._2).reverse
 
  sortedKeyCollisions.size // number of sPartitions with a hashing collision
 
  // (collisions, occurences)
  val collisionCounts = sortedKeyCollisions.map(_._2).groupBy(identity).view.mapValues(_.size).toSeq.sortBy(_._2).reverse
   
  (
    groupedHashedKeys.size.toDouble / partitionCount,
    groupedHashedKeys.size.toDouble / hashedRandKeys.size.toDouble,
    sortedKeyCollisions.size.toDouble / groupedHashedKeys.size.toDouble,
    sortedSevereKeyCollisions.size.toDouble / groupedHashedKeys.size.toDouble
  )
}
 
// With a scale factor of 1
val results = Seq(
  iteration(365, 1),
  iteration(365, 5),
  iteration(365, 10),
  iteration(365, 100),
  iteration(365 * 2, 100),
  iteration(365 * 5, 100),
  iteration(365 * 10, 100)
)
 
val avgEfficiency = results.map(_._2).sum / results.length // What is the ratio of executors / output files
val avgCollisionRate = results.map(_._3).sum / results.length // What is the average collision rate
val avgSevereCollisionRate = results.map(_._4).sum / results.length // What is the average collision rate where 3 or more hashes collide
 
(avgEfficiency, avgCollisionRate, avgSevereCollisionRate) // 63.2% Efficiency, 42% collision rate, 12.6% severe collision rate
 
iteration(365, 5, 2) // 37.7% partitions in-use, 77.4% Efficiency, 24.4% collision rate, 4.2% severe collision rate
iteration(365, 5, 5)
iteration(365, 5, 10)
iteration(365, 5, 100)

隨著我們的比例因子接近無窮大,碰撞很快接近于0,效率接近100%。但是,這會(huì)產(chǎn)生另外一個(gè)問題,即大量Spark分區(qū)輸出將為空。同時(shí)這些空的Spark分區(qū)也會(huì)帶來一些資源開銷,增加Driver的內(nèi)存大小,會(huì)使我們更容易遇到,由于異常錯(cuò)誤而導(dǎo)致分區(qū)鍵空間意外增大的問題。

這里的一個(gè)常見方法,是在使用這種方法時(shí)不顯示設(shè)置分區(qū)(默認(rèn)并行度和縮放),如果不提供分區(qū)計(jì)數(shù),則依賴Spark默認(rèn)的spark.default.parallelism值。雖然,通常并行度自然高于總輸出文件數(shù)(因此,隱式提供大于1 的縮放因子)。如果滿足以下條件,這種方式依然是一種有效的方法:

Hive分區(qū)的文件數(shù)大致相等;

可以確定平均分區(qū)文件數(shù)應(yīng)該是多少;

大致知道唯一分區(qū)鍵的總數(shù)。

5.4.5 按范圍重新分區(qū)

按范圍重新分區(qū)是一個(gè)特列,它不使用RoundRobin和Hash Partitioner,而是使用一種特殊的方法,叫做Range Partitioner。

范圍分區(qū)器根據(jù)某些給定鍵的順序在Spark分區(qū)之間進(jìn)行拆分行,但是,它不僅僅是全局排序,而且還擁有以下特性:

具有相同散列的所有記錄將在同一個(gè)分區(qū)中結(jié)束;

所有Spark分區(qū)都將有一個(gè)最小值和最大值與之關(guān)聯(lián);

最小值和最大值將通過使用采樣來檢測(cè)關(guān)鍵頻率和范圍來確定,分區(qū)邊界將根據(jù)這些估計(jì)值進(jìn)行初始設(shè)置;

分區(qū)的大小不能保證完全相等,它們的相等性基于樣本的準(zhǔn)確性,因此,預(yù)測(cè)的每個(gè)Spark分區(qū)的最小值和最大值,分區(qū)將根據(jù)需要增大或縮小來保證前兩個(gè)條件。

總而言之,范圍分區(qū)將導(dǎo)致Spark創(chuàng)建與請(qǐng)求的Spark分區(qū)數(shù)量相等的Bucket數(shù)量,然后它將這些Bucket映射到指定分區(qū)鍵的范圍。例如,如果你的分區(qū)鍵是日期,則范圍可能是(最小值2022-01-01,最大值2023-01-01)。然后,對(duì)于每條記錄,將記錄的分區(qū)鍵與存儲(chǔ)Bucket的最小值和最大值進(jìn)行比較,并相應(yīng)的進(jìn)行分配。如下圖所示:

eaa3f98c-d7fb-11ed-bfe3-dac502259ad0.png

六、總結(jié)

在選擇分區(qū)策略時(shí),需要根據(jù)具體的應(yīng)用場(chǎng)景和需求進(jìn)行選擇。常見的分區(qū)策略包括按照時(shí)間、地域、用戶ID等多個(gè)維度進(jìn)行分區(qū)。在應(yīng)用分區(qū)策略時(shí),還可以通過一些優(yōu)化措施來進(jìn)一步提高分區(qū)的性能和效率,例如合理設(shè)置分區(qū)數(shù)、避免過多的分區(qū)列、減少重復(fù)數(shù)據(jù)等。

總之,分區(qū)是大數(shù)據(jù)處理和分布式計(jì)算中非常重要的技術(shù),可以幫助我們更好的管理和處理大規(guī)模的數(shù)據(jù),提高數(shù)據(jù)處理的效率和性能,進(jìn)而幫助我們更好的應(yīng)對(duì)數(shù)據(jù)分析和業(yè)務(wù)應(yīng)用的挑戰(zhàn)。







審核編輯:劉清

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • 數(shù)據(jù)存儲(chǔ)

    關(guān)注

    5

    文章

    986

    瀏覽量

    51112
  • IOPs
    +關(guān)注

    關(guān)注

    0

    文章

    11

    瀏覽量

    14319
  • HDFS
    +關(guān)注

    關(guān)注

    1

    文章

    30

    瀏覽量

    9658
  • SPARK
    +關(guān)注

    關(guān)注

    1

    文章

    105

    瀏覽量

    19991

原文標(biāo)題:Hive和Spark分區(qū)策略剖析

文章出處:【微信號(hào):OSC開源社區(qū),微信公眾號(hào):OSC開源社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。

收藏 人收藏

    評(píng)論

    相關(guān)推薦

    電源反接制動(dòng)和倒拉反接制動(dòng)有何異同點(diǎn)

    的應(yīng)用場(chǎng)景:? ? ??●?需要在大負(fù)載下快速停止的場(chǎng)合。? ? ??●?需要精確控制停止位置的場(chǎng)合。? ? ??異同點(diǎn)分析? ? ??相同點(diǎn):? ? ??1、目的相同 :都是為了實(shí)現(xiàn)電機(jī)的快速停止
    的頭像 發(fā)表于 10-23 15:52 ?500次閱讀

    Linux磁盤分區(qū)擴(kuò)容方法

    linux分區(qū)常用命令:fdisk,修改MBR分區(qū)表,MBR格式,被修改的分區(qū)大小最大為2T。
    的頭像 發(fā)表于 10-23 11:46 ?585次閱讀
    Linux磁盤<b class='flag-5'>分區(qū)</b>擴(kuò)容方法

    使用FAL分區(qū)管理與easyflash變量管理

    1.FAL組件1.1什么是FALFAL(FlashAbstractionLayer)Flash抽象層,是對(duì)Flash及基于Flash的分區(qū)進(jìn)行管理、操作的抽象層,對(duì)上層統(tǒng)一了Flash及分區(qū)操作
    的頭像 發(fā)表于 10-01 08:10 ?1261次閱讀
    使用FAL<b class='flag-5'>分區(qū)</b>管理與easyflash變量管理

    大數(shù)據(jù)從業(yè)者必知必會(huì)的Hive SQL調(diào)優(yōu)技巧

    大數(shù)據(jù)從業(yè)者必知必會(huì)的Hive SQL調(diào)優(yōu)技巧 摘要 :在大數(shù)據(jù)領(lǐng)域中,Hive SQL被廣泛應(yīng)用于數(shù)據(jù)倉(cāng)庫的數(shù)據(jù)查詢和分析。然而,由于數(shù)據(jù)量龐大和復(fù)雜的查詢需求,Hive SQL查詢
    的頭像 發(fā)表于 09-24 13:30 ?342次閱讀

    電源反接制動(dòng)和倒拉反接制動(dòng)有何異同點(diǎn)

    電源反接制動(dòng)和倒拉反接制動(dòng)是兩種電機(jī)制動(dòng)方式,它們?cè)诠I(yè)自動(dòng)化和電機(jī)控制領(lǐng)域中有著廣泛的應(yīng)用。這兩種制動(dòng)方式各有特點(diǎn)和適用場(chǎng)景,下面我將介紹它們的異同點(diǎn)。 電源反接制動(dòng) 電源反接制動(dòng)是一種電機(jī)
    的頭像 發(fā)表于 09-19 09:10 ?1510次閱讀

    spark為什么比mapreduce快?

    spark為什么比mapreduce快? 首先澄清幾個(gè)誤區(qū): 1:兩者都是基于內(nèi)存計(jì)算的,任何計(jì)算框架都肯定是基于內(nèi)存的,所以網(wǎng)上說的spark是基于內(nèi)存計(jì)算所以快,顯然是錯(cuò)誤的 2;DAG計(jì)算模型
    的頭像 發(fā)表于 09-06 09:45 ?329次閱讀

    spark運(yùn)行的基本流程

    前言: 由于最近對(duì)spark的運(yùn)行流程非常感興趣,所以閱讀了《Spark大數(shù)據(jù)處理:技術(shù)、應(yīng)用與性能優(yōu)化》一書。通過這本書的學(xué)習(xí),了解了spark的核心技術(shù)、實(shí)際應(yīng)用場(chǎng)景以及性能優(yōu)化的方法。本文旨在
    的頭像 發(fā)表于 07-02 10:31 ?489次閱讀
    <b class='flag-5'>spark</b>運(yùn)行的基本流程

    Spark基于DPU的Native引擎算子卸載方案

    1.背景介紹 Apache Spark(以下簡(jiǎn)稱Spark)是一個(gè)開源的分布式計(jì)算框架,由UC Berkeley AMP Lab開發(fā),可用于批處理、交互式查詢(Spark SQL)、實(shí)時(shí)流處理
    的頭像 發(fā)表于 06-28 17:12 ?769次閱讀
    <b class='flag-5'>Spark</b>基于DPU的Native引擎算子卸載方案

    Spark+Hive”在DPU環(huán)境下的性能測(cè)評(píng) | OLAP數(shù)據(jù)庫引擎選型白皮書(24版)DPU部分節(jié)選

    在奇點(diǎn)云2024年版《OLAP數(shù)據(jù)庫引擎選型白皮書》中,中科馭數(shù)聯(lián)合奇點(diǎn)云針對(duì)Spark+Hive這類大數(shù)據(jù)計(jì)算場(chǎng)景下的主力引擎,測(cè)評(píng)DPU環(huán)境下對(duì)比CPU環(huán)境下的性能提升效果。特此節(jié)選該章節(jié)內(nèi)容,與大家共享。
    的頭像 發(fā)表于 05-30 16:09 ?594次閱讀
    “<b class='flag-5'>Spark+Hive</b>”在DPU環(huán)境下的性能測(cè)評(píng) | OLAP數(shù)據(jù)庫引擎選型白皮書(24版)DPU部分節(jié)選

    非平衡電橋與平衡電橋的異同點(diǎn)

    電橋是一種用于測(cè)量電阻、電容、電感等電氣參數(shù)的電路。根據(jù)電橋中各臂電阻之間的關(guān)系,電橋可以分為平衡電橋和非平衡電橋兩種類型。
    的頭像 發(fā)表于 05-13 16:39 ?5595次閱讀

    Spark基于DPU Snappy壓縮算法的異構(gòu)加速方案

    一、總體介紹 1.1 背景介紹 Apache Spark是專為大規(guī)模數(shù)據(jù)計(jì)算而設(shè)計(jì)的快速通用的計(jì)算引擎,是一種與 Hadoop 相似的開源集群計(jì)算環(huán)境,但是兩者之間還存在一些不同之處,這些不同之處使
    的頭像 發(fā)表于 03-26 17:06 ?877次閱讀
    <b class='flag-5'>Spark</b>基于DPU Snappy壓縮算法的異構(gòu)加速方案

    RDMA技術(shù)在Apache Spark中的應(yīng)用

    、電信、零售、醫(yī)療保健還是物聯(lián)網(wǎng),Spark的應(yīng)用幾乎遍及所有需要處理海量數(shù)據(jù)和復(fù)雜計(jì)算的領(lǐng)域。它的快速、易用和通用性,使得數(shù)據(jù)科學(xué)家和工程師能夠輕松實(shí)現(xiàn)數(shù)據(jù)挖掘、數(shù)據(jù)分析、實(shí)時(shí)處理等任務(wù)。 然而,在Spark的燦爛光環(huán)背后,一
    的頭像 發(fā)表于 03-25 18:13 ?1613次閱讀
    RDMA技術(shù)在Apache <b class='flag-5'>Spark</b>中的應(yīng)用

    基于DPU和HADOS-RACE加速Spark 3.x

    背景簡(jiǎn)介 Apache Spark(下文簡(jiǎn)稱Spark)是一種開源集群計(jì)算引擎,支持批/流計(jì)算、SQL分析、機(jī)器學(xué)習(xí)、圖計(jì)算等計(jì)算范式,以其強(qiáng)大的容錯(cuò)能力、可擴(kuò)展性、函數(shù)式API、多語言支持(SQL
    的頭像 發(fā)表于 03-25 18:12 ?1431次閱讀
    基于DPU和HADOS-RACE加速<b class='flag-5'>Spark</b> 3.x

    亥姆霍茲線圈并聯(lián)和串聯(lián)有何異同

    亥姆霍茲線圈并聯(lián)和串聯(lián)的異同點(diǎn)。 一、并聯(lián)連接 1.連接方式: 并聯(lián)連接指的是將兩個(gè)亥姆霍茲線圈的末端通過導(dǎo)線連接在一起,形成電路中的平行分支。 2.工作原理: 在并聯(lián)連接中,兩個(gè)亥姆霍茲線圈通過共享電流分流器將電流分流,使它
    的頭像 發(fā)表于 03-09 09:31 ?2575次閱讀

    NFC技術(shù)與RFID技術(shù)有哪些異同點(diǎn)

    NFC技術(shù)與RFID技術(shù)在一些方面相似,但也存在一些不同之處。以下是它們之間的主要異同點(diǎn): 相同點(diǎn): 都是無線通信技術(shù):NFC和RFID都是利用無線信號(hào)進(jìn)行數(shù)據(jù)傳輸?shù)耐ㄐ偶夹g(shù),它們都不需要通過物理
    的頭像 發(fā)表于 03-08 17:56 ?2540次閱讀
    主站蜘蛛池模板: 在线99热 | 中文字幕在线永久在线视频2020 | 亚洲伊人99综合网 | 91免费网站在线看入口黄 | 天天草狠狠干 | 天天添| 欧美成人a | 一区二区3区免费视频 | 天天射天天干天天舔 | 亚洲综合色就色手机在线观看 | 亚洲国产精品乱码在线观看97 | 综合色99 | 伊人一区二区三区 | 亚洲一区二区三区免费观看 | 性欧美性free | 一级片免费在线观看视频 | 99免费视频观看 | 免费能看的黄色网址 | 欧美激情第一欧美在线 | 日本高清色图 | 黄网站色在线视频免费观看 | 99热久久精品最新 | 男人的天堂网在线 | 九色综合久久综合欧美97 | 中文字幕第15页 | 美女黄视频免费 | 午夜免费观看_视频在线观看 | 中文字幕一区二区精品区 | 国产免费一级在线观看 | 色在线播放 | 国产午夜免费 | 又大又粗进出白浆直流动态图 | 亚洲综合激情九月婷婷 | 欧美综合一区二区三区 | 国产亚洲精品仙踪林在线播放 | 国产xxxxxx久色视频在 | 国产三级精品在线观看 | 激情综合六月 | 性xxxxfreexxxxx国产 | 亚洲视频在线一区 | 丁香婷婷成人 |