隨著 OpenDAL 社區的不斷發展,新的抽象在不斷增加,為新的貢獻者參與開發帶來了不少負擔,不少維護者都希望對 OpenDAL 的內部實現有更深入的了解。與此同時,OpenDAL 的核心設計已經很長時間沒有大幅度的變化,為寫一個內部實現系列提供了可能。我想現在是時候寫一系列關于 OpenDAL 內部實現的文章,從維護者的角度來闡述 OpenDAL 如何設計,如何實現以及如何擴展。在 OpenDAL v0.40 即將發布之際,希望這系列文章能夠更好的幫助社區理解過去,掌握現在,并確定未來。
第一篇文章會先聊聊 OpenDAL 最常使用的數據讀取功能,我會從最外層的接口開始,然后按照 OpenDAL 的調用順序來逐步展開。讓我們開始吧!
整體框架
在開始介紹具體的 OpenDAL 接口之前,我們首先熟悉一下 OpenDAL 項目。
OpenDAL[1]是一個 Apache Incubator 項目,旨在幫助用戶從各種存儲服務中以統一的方式便捷高效訪問數據。它的項目愿景[2]是 “自由訪問數據”:
?Free from services: 任意服務都能通過原生接口自由訪問
?Free from implementations: 無論底層實現如何,都可以通過統一的方式調用
?Free to integrate: 能夠自由地與各種服務,語言集成
?Free to zero cost: 用戶不需要為用不到的功能付出開銷
在這套理念的基礎上,OpenDAL Rust Core 可以主要分成以下組成部分:
?Operator: 對用戶暴露的外層接口
?Layers: 不同中間件的具體實現
?Services: 不同服務的具體實現
所以從宏觀的角度上來看,OpenDAL 的數據讀取調用棧看起來會像是這樣:
所有 Layers 和 Services 都實現了統一了 Accessor 接口,在進行 Operator 構建時會抹除所有的類型信息。對 Operator 來說,不管用戶使用什么服務或者增加了多少中間件,所有的調用邏輯都是一致的。這一設計將 OpenDAL 的 API 拆分成了 Public API 和 Raw API 兩層,其中 Public API 直接暴露給用戶,提供便于使用的上層接口,而 Raw API 則是面向 OpenDAL 內部開發者提供,維護統一的內部接口,并提供一些便利的實現。
Operator
OpenDAL 的 Operator API 會盡可能遵循一致的調用范式,減少用戶的學習和使用成本。以read為例,OpenDAL 提供了以下 API:
?op.read(path): 將指定文件全部內容讀出
?op.reader(path): 創建一個 Reader 用來做流式讀取
?op.read_with(path).range(1..1024): 使用指定參數來讀取文件內容,比如說 range
?op.reader_with(path).range(1..1024): 使用指定參數來創建 Reader 做流式讀取
不難看出read更像是一個語法糖,用來方便用戶快速地進行文件讀取而不需要考慮AsyncRead等各種 trait。而reader則給予了用戶更多的靈活度,實現了AsyncSeek,AsyncRead等社區廣泛使用的 trait,允許用戶更靈活的讀取數據。read_with和reader_with則通過 Future Builder 系列函數,幫助用戶以更自然的方式來指定各種參數。
Operator 內部的邏輯看起來會是這樣:
它的主要工作是面向用戶封裝接口:
?完成OpRead的構建
?調用Accessor提供的read函數
?將返回的值包裹為Reader并在Reader的基礎上實現AsyncSeek,AsyncRead等接口
Layers
這里有一個隱藏的小秘密是 OpenDAL 會自動為 Service 套上一些 Layer 以實現一些內部邏輯,截止到本文完成的時候,OpenDAL 自動增加的 Layer 包括:
?ErrorContextLayer: 為所有的 Operation 返回的 error 注入 context 信息,比如scheme,path等
?CompleteLayer: 為服務補全必須的能力,比如說為 s3 增加 seek 支持
?TypeEraseLayer: 實現類型擦除,將Accessor中的關聯類型統一擦除,讓用戶使用時不需要攜帶泛型參數
這里的ErrorContextLayer和TypeEraseLayer都比較簡單不再贅述,重點聊聊CompleteLayer,它旨在以零開銷的方式為 OpenDAL 返回的Reader增加seek或者next支持,讓用戶不需要再重復實現。OpenDAL 在早期版本中通過不同的函數調用來返回Reader和SeekableReader,但是用戶的實際反饋并不是很好,幾乎所有用戶都在使用SeekableReader。因此后續 OpenDAL 在重構中將 seek 支持作為第一優先級加入了內部的Readtrait 中:
pubtraitRead:Unpin+Send+Sync{ ///Readbytesasynchronously. fnpoll_read(&mutself,cx:&mutContext<'_>,buf:&mut[u8])->Poll>; ///Seekasynchronously. /// ///Returns`Unsupported`errorifunderlyingreaderdoesn'tsupportseek. fnpoll_seek(&mutself,cx:&mutContext<'_>,pos:io::SeekFrom)->Poll >; ///Stream[`Bytes`]fromunderlyingreader. /// ///Returns`Unsupported`errorifunderlyingreaderdoesn'tsupportstream. /// ///ThisAPIexistsforavoidingbytescopyinginsideasyncruntime. ///Userscanpollbytesfromunderlyingreaderanddecidewhento ///read/consumethem. fnpoll_next(&mutself,cx:&mutContext<'_>)->Poll
在 OpenDAL 中實現一個服務的讀取能力就需要實現這個 trait,這是一個內部接口,不會直接暴露給用戶,其中:
?poll_read是最基礎的要求,所有服務都必須實現這一接口。
?當服務原生支持seek時,可以實現poll_seek,OpenDAL 會進行正確的 dispatch,比如說 local fs;
?而當服務原生支持next,即返回流式的 Bytes 時,可以實現poll_next,比如說基于 HTTP 的服務,他們底層是一個 TCP Stream,hyper 會將其封裝為一個 bytes stream。
通過Readtrait,OpenDAL 確保所有服務都能盡可能地暴露自己的原生支持能力,從而提供對不同服務都能實現高效的讀取。
在此 trait 的基礎上,OpenDAL 會根據各個服務支持的能力來進行補全:
?seek/next 都支持:直接返回
?不支持 next: 使用StreamableReader進行封裝以模擬 next 支持
?不支持 seek: 使用ByRangeSeekableReader進行封裝以模擬 seek 支持
?seek/next 均不支持:同時進行兩種封裝
ByRangeSeekableReader主要利用了服務支持 range read 的能力,當用戶進行 seek 的時候就 drop 當前 reader 并在指定的位置發起新的請求。
OpenDAL 通過CompleteLayer暴露出一個統一的 Reader 實現,用戶不需要考慮底層服務是否支持 seek,OpenDAL 總是會選擇最優的方式來發起請求。
Services
經過 Layers 的補全之后,就到調用 Service 具體實現的地方,這里分別以最常見的兩類服務fs和s3來舉例說明數據是如何讀取的。
Service fs
tokio::File實現了tokio::AsyncRead和tokio::AsyncSeek,通過使用async_compat::Compat,我們將其轉化為了futures::AsyncRead和futures::AsyncSeek。在此基礎上,我們提供了內置的函數oio::into_read_from_file將其轉化為實現了oio::Read的類型,最終的類型名為:oio::FromFileReader
oio::into_read_from_file實現中沒有什么特別復雜的地方,read 和 seek 基本上都是在調用傳入的 File 類型提供的函數。比較麻煩的地方是關于 seek 和 range 的正確處理:seek 到 range 右側是允許的行為,此時不會報錯,read 也只會返回空,但是 seek 到 range 左側是非法行為,Reader 必須返回InvalidInput以便于上層正確處理。
有趣的歷史:當初這塊實現的時候有問題,還是在 fuzz 測試中發現的。
Services s3
S3 是一個基于 HTTP 的服務,opendal 提供了大量基于 HTTP 的封裝以幫助開發者重用邏輯,只需要構建請求,并返回構造好的 Body 即可。OpenDAL Raw API 封裝了一套基于 reqwest 的接口,HTTP GET 接口會返回一個Response
///IncomingAsyncBodycarriesthecontentreturnedbyremoteservers. pubstructIncomingAsyncBody{ ///#TODO /// ///hyperreturns`implStream- >`butwecan't ///writethetypesinstable.Sowewillboxhere. /// ///After[TAIT](https://rust-lang.github.io/rfcs/2515-type_alias_impl_trait.html) ///hasbeenstable,wecanchange`IncomingAsyncBody`into`IncomingAsyncBody
`. inner:oio::Streamer, size:Option, consumed:u64, chunk:Option , }
這個 body 內部包含的 stream 是 reqwest 返回的 bytes stream,opendal 在此基礎上實現了 content length 檢查和 read 支持。
這里額外提一嘴關于 reqwest/hyper 的小坑:reqwets 和 hyper 并沒有檢查返回的 content length,所以一個非法的 server 可能會返回與預期的 content length 不符的數據量而非報錯,進而導致數據的行為不符合預期。OpenDAL 在這里專門增加了檢查,在數據不足時返回ContentIncomplete,并在數據超出預期時返回ContentTruncated,避免用戶收到非法的數據。
總結
本文自頂向下介紹了 OpenDAL 如何實現數據讀取:
?Operator 負責對用戶暴露易用的接口
?Layers 負責對服務的能力進行補全
?Services 負責不同服務的具體實現
在整個鏈路中 OpenDAL 都盡可能遵循零開銷的原則,優先使用服務原生提供能力,其次再考慮通過其他的方法進行模擬,最后才會返回不支持的報錯。通過這三層的設計,用戶不需要了解底層服務的細節,也不需要接入不同服務的 SDK 就可以輕松地調用op.read(path)來訪問任意存儲服務中的數據。
這就是: HowOpenDALread data freely!
-
接口
+關注
關注
33文章
8961瀏覽量
153295 -
API
+關注
關注
2文章
1563瀏覽量
63607 -
函數
+關注
關注
3文章
4372瀏覽量
64314 -
數據讀取
+關注
關注
0文章
9瀏覽量
6599
原文標題:OpenDAL 內部實現:數據讀取
文章出處:【微信號:Rust語言中文社區,微信公眾號:Rust語言中文社區】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
如何在MCS中實現ADC數據讀取?
ADS1299的配套軟件不支持讀取級聯的其他芯片的數據,如何實現讀取級聯的多個芯片的數據呢?
FPGA怎么實現控制CF從SDRAM中讀取數據及實現CF卡向SDRAM傳數據
實現 Labview 和SQL server進行數據的讀取和寫入
MOVC實現讀取程序存儲區域的靜態數據
基于CPLD的Flash讀取控制的設計與實現
衛星SAR數據讀取與保存方法研究與軟件實現
TensorFlow數據讀取機制分析

Windows Server實現RAID技術,保證數據的讀取速度和安全
使用STM32單片機實現AD7606并行讀取數據的代碼免費下載

評論