Apache Pulsar
Apache Pulsar是Apache軟件基金會頂級項目,是下一代云原生分布式消息流平臺。
Pulsar 作為下一代云原生分布式消息流平臺,支持 多租戶、持久化存儲、多機房跨區(qū)域數(shù)據(jù)復制 ,具有強一致性、高吞吐以及低延時的高可擴展流數(shù)據(jù)存儲特性, 內(nèi)置諸多其他系統(tǒng)商業(yè)版本才有的特性,是云原生時代解決實時消息流數(shù)據(jù)傳輸、存儲和計算的最佳解決方案。
Pulsar簡介
- 系統(tǒng)架構(gòu)
- 功能特色
租戶和命名空間(namespace)是 Pulsar 支持多租戶的兩個核心概念。在租戶級別,Pulsar 為特定的租戶預留合適的存儲空間、應用授權(quán)與認證機制。在命名空間級別,Pulsar 有一系列的配置策略(policy),包括存儲配額、流控、消息過期策略和命名空間之間的隔離策略。
Pulsar 做了隊列模型和流模型的統(tǒng)一,在 Topic 級別只需保存一份數(shù)據(jù),同一份數(shù)據(jù)可多次消費。以流式、隊列等方式計算不同的訂閱模型大大提升了靈活度。
Pulsar 使用計算與存儲分離的云原生架構(gòu),數(shù)據(jù)從 Broker 搬離,存在共享存儲內(nèi)部。上層是無狀態(tài) Broker,復制消息分發(fā)和服務;下層是持久化的存儲層 Bookie 集群。Pulsar 存儲是分片的,這種構(gòu)架可以避免擴容時受限制,實現(xiàn)數(shù)據(jù)的獨立擴展和快速恢復。
Pulsar 原生支持跨地域復制,因此 Pulsar 可以跨不同地理位置的數(shù)據(jù)中心復制數(shù)據(jù)。當數(shù)據(jù)中心中斷或網(wǎng)絡分區(qū)時,在多個數(shù)據(jù)中心存有消息副本尤為重要,提高可用性。
Pulsar Functions 是基于 Pulsar 的輕量級流處理方式。Pulsar Functions 直接部署在 broker 節(jié)點上(或作為 Kubernetes 集群中的容器)。通過 Pulsar Functions,Pulsar 可以直接解決許多流處理任務,簡化操作。 - 支持客戶端
Pulsar安裝與部署
目前Pulsar不支持Window,下面通過Docker進行安裝,可以參考官網(wǎng)https://pulsar.apache.org/docs/next/getting-started-docker/
同時可以安裝Pulsar Manager,具體操作可以參考官方文檔 https://pulsar.apache.org/docs/next/administration-pulsar-manager/
其中Pulsar Manager 是一個網(wǎng)頁式可視化管理與監(jiān)測工具,支持多環(huán)境下的動態(tài)配置??捎糜诠芾砗捅O(jiān)測租戶、命名空間、topic、訂閱、broker、集群等。
- window環(huán)境使用docker推薦使用Docker Desktop,和linux一樣可以通過docker命令管理鏡像、部署容器等操作。
打開并啟動Docker Desktop后,在終端執(zhí)行命令執(zhí)行
_> docker search pulsar
可以查詢到pulsar相關(guān)的鏡像
- 鏡像下載
這里我們選擇分別下載紅框的兩個鏡像,執(zhí)行命令
_> docker pull apachepulsar/pulsar _> docker pull apachepulsar/pulsar-manager
- 啟動
- 啟動Pulsar
docker run -it -p 6650:6650 -p 8080:8080
--mount source=pulsardata,target=/pulsar/data
--mount source=pulsarconf,target=/pulsar/conf
apachepulsar/pulsar bin/pulsar standalone
- 啟動Pulsar Manager
docker run --name pulsar-manager -dit
-p 9527:9527 -p 7750:7750
-e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties
apachepulsar/pulsar-manager
添加用戶:
for /f "tokens=1" %A in ('curl http://localhost:7750/pulsar-manager/csrf-token') do set CSRF_TOKEN=%A
curl -X PUT "X-XSRF-TOKEN: %CSRF_TOKEN%" -H "Cookie: XSRF-TOKEN=%CSRF_TOKEN%;"
-H "Content-Type: application/json" -d "{"name": "admin", "password": "123456", "description": "super user admin", "email": "admin@test.com"}"
"http://localhost:7750/pulsar-manager/users/superuser"
訪問:
http://localhost:9527/
用戶名密碼:admin/123456
配置environments:
這里需要保證Pulsar Manager應用服務能夠訪問到Pulsar應用,由于都是通過Docker部署,配置Service URL需要使用網(wǎng)絡IP,不要用localhost。
管理界面:
Pulsar與SpringBoot集成
- springboot version : 2.3.7.RELEASE
- pulsar client: 2.10.2
- 通過Properties簡單定義一些Broker相關(guān)的屬性
@Data
@ConfigurationProperties(prefix = "pulsar")
public class PulsarProperties {
private String cluster;
private String namespace;
private String serverUrl;
private String token;
}
- 通過配置定義了一些常用的組件,比如生產(chǎn)、消費工廠
@Configuration
@EnableConfigurationProperties({PulsarProperties.class})
public class PulsarBootstrapConfiguration {
private final PulsarProperties properties;
public PulsarBootstrapConfiguration(PulsarProperties properties) {
this.properties = properties;
}
@Bean(destroyMethod = "close")
public PulsarClient pulsarClient() throws PulsarClientException {
ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(properties.getServerUrl());
return clientBuilder.build();
}
@Bean
public PulsarProducerFactory pulsarProducerFactory() throws PulsarClientException {
return new PulsarProducerFactory(pulsarClient(), properties);
}
@Bean
public PulsarConsumerFactory pulsarConsumerFactory() throws PulsarClientException {
return new PulsarConsumerFactory(pulsarClient(), properties);
}
}
- 啟動服務,在服務啟動后,通過實現(xiàn)SmartInitializingSingleton接口,完成容器基本啟動(不包含Lazy的Bean)后,開始對消費者Consumer監(jiān)聽
@Slf4j
@SpringBootApplication
public class PulsarApplication implements SmartInitializingSingleton {
@Autowired
private PulsarConsumerFactory consumerFactory;
public static void main(String[] args) {
SpringApplication.run(PulsarApplication.class,args);
}
@Override
public void afterSingletonsInstantiated() {
startConsumerListener();
}
private void startConsumerListener(){
Consumer< String > consumer = createConsumer();
if( consumer != null ){
while (!Thread.currentThread().isInterrupted()){
CompletableFuture< ? extends Message< ? >> completableFuture = consumer.receiveAsync();
Message< ? > message = null;
try {
message = completableFuture.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("錯誤",e);
} catch (ExecutionException e) {
log.error("錯誤",e);
}
if( message!=null ){
try {
log.info(" 接收消息:{} ", message.getValue() );
consumer.acknowledge(message);
} catch (PulsarClientException e) {
consumer.negativeAcknowledge(message);
throw new RuntimeException(e);
}
}
}
}
}
private Consumer< String > createConsumer() {
try {
return consumerFactory.getConsumer(Constants.TOPIC_DEMO);
} catch (PulsarClientException e) {
log.error("創(chuàng)建consumer出錯:{}", e.getMessage(),e);
}
return null;
}
}
- 消息發(fā)送測試
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class PulsarBootTests {
@Autowired
private PulsarProducerFactory producerFactory;
@Test
public void sendMessage() throws PulsarClientException {
Producer producer = producerFactory.getProducer(Constants.TOPIC_DEMO);
producer.send(" 測試消息: " + new Date());
producer.close();
}
}
- 檢查消息接收情況
2023-02-05 12:05:14.043 INFO 23472 --- [ulsar-timer-6-1] o.a.p.c.impl.ConsumerStatsRecorderImpl : [TOPIC_DEMO] [sub-TOPIC_DEMO] [7c2b2] Prefetched messages: 0 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.02 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
2023-02-05 12:06:16.425 INFO 23472 --- [ main] com.sucl.pulsar.PulsarApplication : 接收消息: 測試消息: Sun Feb 05 12:06:16 CST 2023
結(jié)束語
該篇主要通過官網(wǎng)對Apache Pulsar做了簡單的了解與嘗試,同時基于SpringBoot,以簡單的示例代碼實現(xiàn)了消息的發(fā)送與接收,其中各個組件僅僅使用了默認的配置,在生產(chǎn)環(huán)境需要根據(jù)Pulsar的特性以及官方API使其具有擴展性與易用性。
-
數(shù)據(jù)傳輸
+關(guān)注
關(guān)注
9文章
1983瀏覽量
65286 -
存儲
+關(guān)注
關(guān)注
13文章
4451瀏覽量
86789 -
容器
+關(guān)注
關(guān)注
0文章
503瀏覽量
22299 -
Apache
+關(guān)注
關(guān)注
0文章
64瀏覽量
12604
發(fā)布評論請先 登錄
相關(guān)推薦
Apache Spark 1.6預覽版新特性展示

10 lead Pulsar ADCs Evaluation Software

PulSAR ADC PMOD Labview Evaluation Software

PULSAR模擬電壓可變衰減器特性
Apache與Weblogic的整合

Apache2+tomcat5.5集群及Apache負載均衡配置實例

評論