在上一篇文章中,我們用tokio實(shí)現(xiàn)了客戶(hù)端和服務(wù)器的基本框架并設(shè)置了toml格式的配置文件。在這一篇文章中,我們參考Redis的命令:GET、SET、PUBLISH和SUBSCRIBE,使用Protobuf來(lái)實(shí)現(xiàn)客戶(hù)端與服務(wù)器之間的通信協(xié)議層。為了處理Protobuf,我們加入了post庫(kù)。同時(shí)加入了tracing庫(kù)用于日志處理。Cargo.toml如下:
Protobuf在項(xiàng)目根目錄下新建cmd.proto,加入如下代碼:[dependencies]
anyhow = "1"
tokio = { version = "1.19", features = ["full"] }
serde = { version = "1", features = ["derive"] }
toml = "0.5"
tracing = "0.1"
tracing-subscriber = "0.3"
bytes = "1"
prost = "0.11"
[build-dependencies]
prost-build = "0.11"
1syntax="proto3";
2
3packagecmd;
4
5//命令請(qǐng)求
6messageCmdRequest{
7oneofreq_data{
8Getget=1;
9Setset=2;
10Publishpublish=3;
11Subscribesubscribe=4;
12Unsubscribeunsubscribe=5;
13}
14}
15
16//服務(wù)器的響應(yīng)
17messageCmdResponse{
18uint32status=1;
19stringmessage=2;
20bytesvalue=3;
21}
22
23//請(qǐng)求值命令
24messageGet{
25stringkey=1;
26}
27
28//存儲(chǔ)值命令
29messageSet{
30stringkey=1;
31bytesvalue=2;
32uint32expire=3;
33}
34
35//向Topic發(fā)布值命令
36messagePublish{
37stringtopic=1;
38bytesvalue=2;
39}
40
41//訂閱Topic命令
42messageSubscribe{
43stringtopic=1;
44}
45
46//取消訂閱命令
47messageUnsubscribe{
48stringtopic=1;
49uint32id=2;
50}
在src目錄下創(chuàng)建pb目錄,在根目錄下創(chuàng)建build.rs文件,加入如下代碼:
1fnmain(){
2letmutconf=prost_build::new();
3conf.bytes(&["."]);
4conf.type_attribute(".","#[derive(PartialOrd)]");
5conf.out_dir("src/pb")
6.compile_protos(&["cmd.proto"],&["."])
7.unwrap();
8}
在src/pb目錄下已經(jīng)自動(dòng)生成了cmd.rs文件。在src/pb目錄下創(chuàng)建mod.rs文件,加入如下代碼:
1usebytes::Bytes;
2
3usecrate::{cmd_request::ReqData,CmdRequest,Get,Publish,Set,Subscribe,Unsubscribe};
4
5pubmodcmd;
6
7implCmdRequest{
8//GET命令
9pubfnget(key:implInto) ->Self{
10Self{
11req_data:Some(ReqData::Get(Get{key:key.into()})),
12}
13}
14
15//SET命令
16pubfnset(key:implInto,value:Bytes,expire:u32) ->Self{
17Self{
18req_data:Some(ReqData::Set(Set{
19key:key.into(),
20value,
21expire,
22})),
23}
24}
25
26//PUBLISH命令
27pubfnpublish(topic:implInto,value:Bytes) ->Self{
28Self{
29req_data:Some(ReqData::Publish(Publish{
30topic:topic.into(),
31value,
32})),
33}
34}
35
36//訂閱命令
37pubfnsubscribe(topic:implInto) ->Self{
38Self{
39req_data:Some(ReqData::Subscribe(Subscribe{
40topic:topic.into(),
41})),
42}
43}
44
45//解除訂閱命令
46pubfnunsubscribe(topic:implInto,id:u32) ->Self{
47Self{
48req_data:Some(ReqData::Unsubscribe(Unsubscribe{
49topic:topic.into(),
50id,
51})),
52}
53}
54}
55
56implCmdResponse{
57pubfnnew(status:u32,message:String,value:Bytes)->Self{
58Self{
59status,
60message,
61value,
62}
63}
64}
在 src/lib.rs 中,引入pb模塊:
1modpb;
2pubusepb::*;
客戶(hù)端 & 服務(wù)器我們使用tokio-util庫(kù)的Frame里的LengthDelimitedCodec(根據(jù)長(zhǎng)度進(jìn)行編解碼)對(duì)protobuf協(xié)議進(jìn)行封包解包。在Cargo.toml里加入tokio-util依賴(lài):修改src/bin/kv_server.rs代碼:[dependencies]
......
futures = "0.3"
tokio-util = {version = "0.7", features = ["codec"]}
......
1#[tokio::main]
2asyncfnmain()->Result<(),Box> {
3tracing_subscriber::init();
4
5......
6
7loop{
8......
9
10tokio::spawn(asyncmove{
11//使用Frame的LengthDelimitedCodec進(jìn)行編解碼操作
12letmutstream=Framed::new(stream,LengthDelimitedCodec::new());
13whileletSome(Ok(mutbuf))=stream.next().await{
14//對(duì)客戶(hù)端發(fā)來(lái)的protobuf請(qǐng)求命令進(jìn)行拆包
15letcmd_req=CmdRequest::decode(&buf[..]).unwrap();
16info!("Receiveacommand:{:?}",cmd_req);
17
18buf.clear();
19
20//對(duì)protobuf的請(qǐng)求響應(yīng)進(jìn)行封包,然后發(fā)送給客戶(hù)端。
21letcmd_res=CmdResponse::new(200,"success".to_string(),Bytes::default());
22cmd_res.encode(&mutbuf).unwrap();
23stream.send(buf.freeze()).await.unwrap();
24}
25info!("Client{:?}disconnected",addr);
26});
27}
28}
修改src/bin/kv_client.rs代碼:
1#[tokio::main]
2asyncfnmain()->Result<(),Box> {
3tracing_subscriber::init();
4
5......
6
7//使用Frame的LengthDelimitedCodec進(jìn)行編解碼操作
8letmutstream=Framed::new(stream,LengthDelimitedCodec::new());
9letmutbuf=BytesMut::new();
10
11//創(chuàng)建GET命令
12letcmd_get=CmdRequest::get("mykey");
13cmd_get.encode(&mutbuf).unwrap();
14
15//發(fā)送GET命令
16stream.send(buf.freeze()).await.unwrap();
17info!("Send info successed!");
18
19//接收服務(wù)器返回的響應(yīng)
20whileletSome(Ok(buf))=stream.next().await{
21letcmd_res=CmdResponse::decode(&buf[..]).unwrap();
22info!("Receivearesponse:{:?}",cmd_res);
23}
24
25Ok(())
26}
我們打開(kāi)二個(gè)終端,分別輸入以下命令:服務(wù)器執(zhí)行結(jié)果:RUST_LOG=info cargo run --bin kv_server
RUST_LOG=info cargo run --bin kv_client
客戶(hù)端執(zhí)行結(jié)果:INFO kv_server: Listening on 127.0.0.1:19999 ......
INFO kv_server: Client: 127.0.0.1:50655 connected
INFO kv_server: Receive a command: CmdRequest { req_data: Some(Get(Get { key: "mykey" })) }
INFO kv_client: Send info successed!
INFO kv_client: Receive a response: CmdResponse { status: 200, message: "success", value: b"" }
服務(wù)器和客戶(hù)端都正常處理了收到的請(qǐng)求和響應(yīng)。 下一篇文章我們將在服務(wù)器端使用內(nèi)存來(lái)存儲(chǔ)客戶(hù)端發(fā)送過(guò)來(lái)的數(shù)據(jù)。 完整代碼:https://github.com/Justin02180218/kv_server_rust
審核編輯:湯梓紅
-
通信協(xié)議
+關(guān)注
關(guān)注
28文章
1008瀏覽量
40998 -
服務(wù)器
+關(guān)注
關(guān)注
13文章
9727瀏覽量
87431 -
客戶(hù)端
+關(guān)注
關(guān)注
1文章
298瀏覽量
17016
原文標(biāo)題:用Rust實(shí)現(xiàn)KV Server-2 協(xié)議層
文章出處:【微信號(hào):Rust語(yǔ)言中文社區(qū),微信公眾號(hào):Rust語(yǔ)言中文社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
一個(gè)服務(wù)器,多個(gè)客戶(hù)端,怎么向指定的客戶(hù)端發(fā)數(shù)據(jù)
如何實(shí)現(xiàn)服務(wù)器對(duì)指定客戶(hù)端的監(jiān)聽(tīng)?
labview-TCP多客戶(hù)端服務(wù)器
怎么使用Paho來(lái)實(shí)現(xiàn)和MQTT服務(wù)器的基本通信?
BTC設(shè)備服務(wù)器的系統(tǒng)搭建
4412開(kāi)發(fā)板Qt網(wǎng)絡(luò)編程-TCP實(shí)現(xiàn)服務(wù)器和客戶(hù)端
如何實(shí)現(xiàn)服務(wù)器和客戶(hù)端數(shù)據(jù)交互?
如何使用esp8266在服務(wù)器和客戶(hù)端之間發(fā)送數(shù)據(jù)?
當(dāng)WiFi信號(hào)變低時(shí),服務(wù)器和客戶(hù)端之間的TCP通信丟失,如何使客戶(hù)端重新連接?
服務(wù)器和客戶(hù)端之間的TCP通信丟失怎么處理?
TCP通信服務(wù)器端和客戶(hù)端同機(jī)互傳的簡(jiǎn)單示例程序免費(fèi)下載

Linux下TCP網(wǎng)絡(luò)編程-創(chuàng)建服務(wù)器與客戶(hù)端

MQTT中服務(wù)端和客戶(hù)端
服務(wù)端如何控制客戶(hù)端之間的信息通訊

服務(wù)器Server和客戶(hù)端Client的區(qū)別

評(píng)論