在线观看www成人影院-在线观看www日本免费网站-在线观看www视频-在线观看操-欧美18在线-欧美1级

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

golang中使用kafka的綜合指南

馬哥Linux運維 ? 來源:稀土掘金技術社區 ? 2023-11-30 11:18 ? 次閱讀

介紹

kafka是一個比較流行的分布式、可拓展、高性能、可靠的流處理平臺。在處理kafka的數據時,這里有確保處理效率和可靠性的多種最佳實踐。本文將介紹這幾種實踐方式,并通過sarama實現他們。

以下是一些kafka消費的最佳實踐:

選擇合適的提交策略:Kafka提供兩種提交策略,自動和手動。雖然自動操作很容易使用,但它可能會導致數據丟失或重復。手動提交提供了更高級別的控制,確保消息至少處理一次或恰好一次,具體取決于用例。

盡可能減少Kafka的傳輸次數:大批量讀取消息可以顯著提高吞吐量。這可以通過調整 fetch.min.bytes 和 fetch.max.wait.ms 等參數來實現。

盡可能使用消費者組:Kafka允許多個消費者組成一個消費者組來并行消費數據。這使得 Kafka 能夠將數據分發給一個組中的所有消費者,從而實現高效的數據消費。

調整消費者緩沖區大小:通過調整消費者的緩沖區大小,如 receive.buffer.bytes 和 max.partition.fetch.bytes,可以根據消息的預期大小和消費者的內存容量進行調整。這可以提高消費者的表現。

處理rebalance:當新的消費者加入消費者組,或者現有的消費者離開時,Kafka會觸發rebalance以重新分配負載。在此過程中,消費者停止消費數據。因此,快速有效地處理重新平衡可以提高整體吞吐量。

監控消費者:使用 Kafka 的消費者指標來監控消費者的性能。定期監控可以幫助我們識別性能瓶頸并調整消費者的配置。

選擇合適的提交策略

1.自動提交

Sarama 的 ConsumerGroup 默認情況下會自動提交偏移量。這意味著它會定期提交已成功消費的消息的偏移量,這允許消費者在重新啟動或消費失敗時從中斷的地方繼續。

下面是一個自動提交的消費者組消費消息的例子:


config := sarama.NewConfig()  
config.Version = sarama.V2_0_0_0 
config.Consumer.Offsets.AutoCommit.Enable = true  
config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second  
  
ConsumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config)  
if err != nil {  
    log.Panicf( "創建消費者組客戶端時出錯: %v" , err)  
}  
  
Consumer := Consumer{}  
ctx := context.Background()  
  
for {  
    err := ConsumerGroup.Consume(ctx, [] string {topic}, Consumer)  
    if err != nil {  
        log.Panicf( "來自消費者的錯誤: %v" , err)  
    }  
}

根據config.Consumer.Offsets.AutoCommit.Interval可以看到,消費者會每秒自動提交offset。

2. 手動提交

手動提交使我們更好地控制何時提交消息偏移量。下面是一個手動提交的消費者組消費消息的例子:


config := sarama.NewConfig()  
config.Version = sarama.V2_0_0_0 
config.Consumer.Offsets.AutoCommit.Enable = false 
  
consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID , config)  
if err != nil {  
    log.Panicf( "創建消費者組客戶端時出錯: %v" , err)  
}  
  
Consumer := Consumer{}  
ctx := context.Background()  
  
for {  
    err := ConsumerGroup.Consume( ctx, [] string {topic}, Consumer)  
    if err != nil {  
        log.Panicf( "Error from Consumer: %v" , err)  
    }  
}  
  


type Consumer struct {}  
  
func (consumer Consumer) Setup (_ sarama.ConsumerGroupSession) error { return nil }  
func (consumer Consumer) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }  
func (consumer Consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, Claim sarama.ConsumerGroupClaim) error {  
    for msg : = range Claim.Messages() {  
        
        fmt.Printf( "Message topic:%q partition:%d offset:%d
" , msg.Topic, msg.Partition, msg.Offset)  


        
        sess.MarkMessage(msg, "" )  
    }  
    return nil  
}

在該示例中, 使用MarkMessage手動將消息標記為已處理,最終根據Consumer.Offsets.CommitInterval配置提交。另外這個例子省略了錯誤處理部分,開發時需要注意正確處理生產過程中出現的錯誤。

譯者注:這篇文章雖然是今年5月發布,但是這里的提交方式還是有些過時了,目前sarama已經廢棄了Consumer.Offsets.CommitInterval,相關配置目前在Consumer.Offsets.AutoCommit

盡可能減少Kafka的傳輸次數

減少kafka的傳輸次數可以通過優化從kafka中讀取和寫入數據的方式來實現:

1. 增加批次的大小

使用kafka批量發送消息的效果優于逐個發送消息,批次越大,kafka發送數據效率就越高。但是需要權衡延遲和吞吐量之間的關系。較大的批次雖然代表著更大的吞吐量,但也會增加延遲。因為批次越大,填充批次的時間也越久。

在Go中,我們可以在使用sarama包生成消息時設置批次大小:


config := sarama.NewConfig()  
config.Producer.Flush.Bytes = 1024 * 1024

以及獲取消息的批次大小


config := sarama.NewConfig()  
config.Consumer.Fetch.Default = 1024 * 1024

2. 使用長輪詢

長輪詢是指消費者輪詢時如果Kafka中沒有數據,則消費者將等待數據到達。這減少了往返次數,因為消費者不需要在沒有數據時不斷請求數據。


config := sarama.NewConfig() 
config .Consumer.MaxWaitTime = 500 *time.Millisecond

該配置告訴消費者在返回之前會等待500毫秒

3. 盡可能使用消費者組

消費者組是一組協同工作消費來自kafka主題的消息的消費者。消費者組允許我們在多個消費者之間分配消息,從而提供橫向拓展能力。使用消費者組時,kafka負責將分區分配給組中的消費者,并確保每個分區同時僅被一個消費者消費。

接下來是sarama中消費者組的使用:

使用消費者組需要實現一個ConsumerGroupHandler接口

該接口具有三個方法:Setup、Cleanup、 和ConsumeClaim


type exampleConsumerGroupHandler struct { 
} 


func  (h *exampleConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { 
    
    return  nil
 } 


func  (h *exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { 
    
    return  nil
 } 


func  (h *exampleConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, Claim sarama.ConsumerGroupClaim) error { 
    for message := range Claim.Messages() { 
        
        fmt.Printf( "Message: %s
" , string (message.Value)) 


        
        session.MarkMessage(message, "" ) 
    }
    返回 nil
 }

創建sarama.ConsumerGroup并開始消費:


brokers := []string{"localhost:9092"} 
topic := "example_topic"  
groupID := "example_consumer_group"  
  


consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config)  
if err != nil {  
    log.Fatalf("Error creating consumer group: %v", err)  
}  
defer consumerGroup.Close()  
  


handler := &exampleConsumerGroupHandler{}  
  


for {  
    err := consumerGroup.Consume(context.Background(), []string{topic}, handler)  
    if err != nil {  
        log.Printf("Error consuming messages: %v", err)  
    }  
}

該示例設置了一個消費組,用于消費來自“example_topic”的消息。消費者組可以通過添加更多消費者來提高處理能力。

使用消費者組時,記得處理消費期間rebalance和錯誤。

調整消費者緩沖區大小

在sarama中,我們可以調整消費者緩沖區的大小,以調整消費者在處理消息之前可以在內存中保存的消息數量。

默認情況下,緩沖區大小設置為256,這代表Sarama在開始處理消息之前將在內存中保存最多256條消息。如果消費者速度很慢,增加緩沖區大小可能有助于提高吞吐量。但是,更大的緩沖區也會消耗更多的內存。

以下是如何增加緩沖區大小的例子:


config := sarama.NewConfig()  
config.Consumer.Return.Errors = true  
config.Version = sarama.V2_1_0_0  
config.Consumer.Offsets.Initial = sarama.OffsetOldest  


config.ChannelBufferSize = 500  
  


group, err := sarama.NewConsumerGroup([]string{broker}, groupID, config)  
if err != nil {  
    panic(err)  
}  
  


ctx := context.Background()  
for {  
    topics := []string{topic}  
    handler := exampleConsumerGroupHandler{}  


    err := group.Consume(ctx, topics, &handler)  
    if err != nil {  
        panic(err)  
    }  
}

處理rebalance

當新消費者添加到消費者組或現有消費者離開消費者組時,kafka會重新平衡該組中的消費者。rebalance是kafka確保消費者組中的所有消費者不會消費同一分區的保證。

在sarama中,處理rebalance是通過 Setup 和CleanUp函數來完成的。

通過正確處理重新平衡事件,您可以確保應用程序正常處理消費者組的更改,例如消費者離開或加入,并且在這些事件期間不會丟失或處理兩次消息。

譯者注:其實更重要的是在ConsumeClaim函數在通道關閉時盡早退出,才能正確的進入CleanUp函數。

監控消費者

監控Kafka消費者對于確保系統的健康和性能至關重要,我們需要時刻關注延遲、處理時間和錯誤率的指標。

Golang沒有內置對 Kafka 監控的支持,但有幾個庫和工具可以幫助我們。讓我們看一下其中的一些:

Sarama的Metrics:Sarama 提供了一個指標注冊表,它報告了有助于監控的各種指標,例如請求、響應的數量、請求和響應的大小等。這些指標可以使用 Prometheus 等監控系統來收集和監控。

JMX Exporter:如果您在 JVM 上運行 Kafka, 則可以使用 JMX Exporter 將kafka的 MBeans 發送給Prometheus

Kafka Exporter:Kafka Exporter是一個第三方工具,可以提供有關Kafka的更詳細的指標。它可以提供消費者組延遲,這是消費kafka消息時要監控的關鍵指標。

Jaeger 或 OpenTelemetry:這些工具可用于分布式追蹤,這有助于追蹤消息如何流經系統以及可能出現瓶頸的位置。

日志:時刻關注應用程序日志,記錄消費者中的任何錯誤或異常行為。這些日志可以幫助我們診斷問題。

消費者組命令, 可以使用kafka-consumer-groups命令來描述消費者組的狀態。

請記住,不僅要追蹤這些指標,還要針對任何需要關注的場景設置警報。通過這些方法,我們可以在問題還在初始階段時快速做出響應。

以上工作有助于確保使用kafka的應用程序健壯、可靠且高效。

審核編輯:湯梓紅

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • 數據
    +關注

    關注

    8

    文章

    7245

    瀏覽量

    91058
  • 參數
    +關注

    關注

    11

    文章

    1867

    瀏覽量

    32875
  • kafka
    +關注

    關注

    0

    文章

    53

    瀏覽量

    5363

原文標題:golang中使用kafka的綜合指南

文章出處:【微信號:magedu-Linux,微信公眾號:馬哥Linux運維】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏

    評論

    相關推薦
    熱點推薦

    如何使用Golang連接MySQL

    首先我們來看如何使用Golang連接MySQL。
    的頭像 發表于 01-08 09:42 ?3826次閱讀
    如何使用<b class='flag-5'>Golang</b>連接MySQL

    Kafka讀取數據操作指南

    Kafka消費者——從 Kafka讀取數據
    發表于 09-16 06:42

    淺析kafka

    kafka常見問題
    發表于 09-29 10:09

    基于發布與訂閱的消息系統Kafka

    Kafka權威指南》——初識 Kafka
    發表于 03-05 13:46

    Kafka基礎入門文檔

    kafka系統入門教程(原理、配置、集群搭建、Java應用、Kafka-manager)
    發表于 03-12 07:22

    Kafka集群環境的搭建

    1、環境版本版本:kafka2.11,zookeeper3.4注意:這里zookeeper3.4也是基于集群模式部署。2、解壓重命名tar -zxvf
    發表于 01-05 17:55

    現代的服務端技術棧:Golang/Protobuf/gRPC詳解

    Golang又稱Go語言,是一個開源的、多用途的編程語言,由Google研發,并由于種種原因,正在日益流行。Golang已經有10年的歷史,并且據Google稱已經在生產環境中使用了接近7年的時間,這一點可能讓大多數人大跌眼鏡。
    的頭像 發表于 12-25 17:32 ?1354次閱讀

    Kafka的概念及Kafka的宕機

    問題要從一次Kafka的宕機開始說起。 筆者所在的是一家金融科技公司,但公司內部并沒有采用在金融支付領域更為流行的 RabbitMQ ,而是采用了設計之初就為日志處理而生的 Kafka ,所以我一直
    的頭像 發表于 08-27 11:21 ?2405次閱讀
    <b class='flag-5'>Kafka</b>的概念及<b class='flag-5'>Kafka</b>的宕機

    初探Golang內聯

    今天我們來聊聊 Golang 中的內聯。
    的頭像 發表于 12-13 09:51 ?1175次閱讀

    GoLang的安裝和使用

    GoLang的安裝和使用
    的頭像 發表于 01-13 14:06 ?1470次閱讀
    <b class='flag-5'>GoLang</b>的安裝和使用

    Kafka 的簡介

    ? 1 kafka簡介 2 為什么要用消息系統 3 kafka基礎知識 4 kafka集群架構 5 總結 ? 1 kafka簡介 其主要設計目標如下: 以時間復雜度為O(1)的方式提供
    的頭像 發表于 07-03 11:10 ?846次閱讀
    <b class='flag-5'>Kafka</b> 的簡介

    物通博聯5G-kafka工業網關實現kafka協議對接到云平臺

    Kafka協議是一種基于TCP層的網絡協議,用于在分布式消息傳遞系統Apache Kafka中發送和接收消息。Kafka協議定義了客戶端和服務器之間的通信方式和數據格式,允許客戶端發送消息到K
    的頭像 發表于 07-11 10:44 ?699次閱讀

    Kafka架構技術:Kafka的架構和客戶端API設計

    Kafka 給自己的定位是事件流平臺(event stream platform)。因此在消息隊列中經常使用的 "消息"一詞,在 Kafka 中被稱為 "事件"。
    的頭像 發表于 10-10 15:41 ?2698次閱讀
    <b class='flag-5'>Kafka</b>架構技術:<b class='flag-5'>Kafka</b>的架構和客戶端API設計

    kafka相關命令詳解

    kafka常用命令詳解
    的頭像 發表于 10-20 11:34 ?1268次閱讀

    kafka基本原理詳解

    今天浩道跟大家分享一篇關于kafka相關原理的硬核干貨,可以說即使你沒有接觸過kafka,也可以秒懂,一起看看!
    的頭像 發表于 01-03 09:57 ?1112次閱讀
    <b class='flag-5'>kafka</b>基本原理詳解
    主站蜘蛛池模板: 欧美一级特黄高清免费 | 深深激情网 | 天天爱天天做色综合 | 久草资源站在线 | 色综合天天五月色 | 九九热精品视频在线播放 | 天堂8资源在线官网资源 | 手机看片日韩永久福利盒子 | 国产小视频免费看 | 亚洲第一黄色网址 | 午夜一区二区免费视频 | 性欧美zoz0另类xxxx | 亚洲国产情侣偷自在线二页 | 性生交酡 | 日本aaaa级片| 日日干日日爽 | 黄色片免费看视频 | 午夜黄色大片 | 超h 高h 污肉1v1御书屋 | 国模无水印一区二区三区 | 国产三级a三级三级天天 | 天堂在线资源最新版 | 黄色网在线看 | 日本人六九视频69jzz免费 | 老色批视频| 国产v69| 三级视频中文字幕 | www.亚洲色图.com | 一本大道加勒比久久综合 | 色五五月| 天天色天天草 | 亚色在线观看 | 狠狠色噜噜综合社区 | 亚洲女同一区二区 | 美国一级大黄香蕉片 | 性色在线播放 | 色天使色婷婷丁香久久综合 | 视频在线高清完整免费观看 | 777色狠狠一区二区三区香蕉 | 五月天婷婷色 | 久久99久久精品97久久综合 |