Skip to content

Commit 1c39dc7

Browse files
committed
added RPC example
1 parent 4345891 commit 1c39dc7

File tree

2 files changed

+268
-2
lines changed

2 files changed

+268
-2
lines changed

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ version = "^2.1"
4141
features = ["async-io"]
4242

4343
[dependencies.flume]
44-
version = "^0.10"
44+
version = "^0.11"
4545
default-features = false
4646
features = ["async"]
4747

@@ -66,12 +66,13 @@ waker-fn = "^1.1"
6666

6767
[dev-dependencies]
6868
async-global-executor = "^2.0"
69-
futures-lite = "^1.7"
69+
futures-lite = "^2.2"
7070
serde_json = "^1.0"
7171
waker-fn = "^1.1"
7272
tokio = { version = "1.17.0", features = ["full"] }
7373
tokio-executor-trait = "2.1.0"
7474
tokio-reactor-trait = "1.1.0"
75+
uuid = { version = "1.7", features = ["v4"] }
7576

7677
[dev-dependencies.tracing-subscriber]
7778
version = "^0.3"

examples/rpc.rs

Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
use futures_lite::stream::StreamExt;
2+
use lapin::{
3+
options::*, types::FieldTable, BasicProperties, Channel, Connection, ConnectionProperties,
4+
};
5+
use tokio::time::{sleep, Duration};
6+
use tracing::{error, info};
7+
use uuid::Uuid;
8+
9+
async fn open_channel() -> Result<Channel, Box<dyn std::error::Error>> {
10+
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672".into());
11+
let conn = Connection::connect(&addr, ConnectionProperties::default()).await?;
12+
let channel = conn.create_channel().await?;
13+
channel
14+
.confirm_select(ConfirmSelectOptions::default())
15+
.await?;
16+
Ok(channel)
17+
}
18+
19+
struct RpcClient {
20+
channel: Channel,
21+
reply_to: String,
22+
}
23+
24+
impl RpcClient {
25+
async fn try_new() -> Result<Self, Box<dyn std::error::Error>> {
26+
let channel = open_channel().await?;
27+
28+
channel
29+
.queue_declare(
30+
"rpc_queue",
31+
QueueDeclareOptions {
32+
durable: true,
33+
..Default::default()
34+
},
35+
FieldTable::default(),
36+
)
37+
.await
38+
.map_err(|e| format!("Failed to declare queue rpc_queue:{e}"))?;
39+
40+
let reply_to = "rpc_queue_reply_to_".to_owned() + Uuid::new_v4().to_string().as_str();
41+
channel
42+
.queue_declare(
43+
reply_to.as_str(),
44+
QueueDeclareOptions {
45+
exclusive: true,
46+
..Default::default()
47+
},
48+
FieldTable::default(),
49+
)
50+
.await
51+
.map_err(|e| format!("Failed to declare queue {reply_to}: {e}"))?;
52+
53+
Ok(Self { channel, reply_to })
54+
}
55+
56+
async fn rpc_call(&mut self, message: i64) -> Result<(), Box<dyn std::error::Error>> {
57+
let correlation_id = Uuid::new_v4().to_string();
58+
while {
59+
let properties = BasicProperties::default()
60+
.with_correlation_id(correlation_id.as_str().into())
61+
.with_reply_to(self.reply_to.as_str().into());
62+
info!(
63+
"RPC client: correlation_id {:?}",
64+
properties.correlation_id()
65+
);
66+
!self
67+
.channel
68+
.basic_publish(
69+
"",
70+
"rpc_queue",
71+
BasicPublishOptions::default(),
72+
message.to_string().as_bytes(),
73+
properties,
74+
)
75+
.await
76+
.map_err(|e| format!("Failed to publish message: {e}"))?
77+
.await
78+
.map_err(|e| format!("Publish confirm error: {e}"))?
79+
.is_ack()
80+
} {
81+
error!("RPC client: did not get ack message, will retry sending it");
82+
sleep(Duration::from_millis(100)).await;
83+
}
84+
info!("RPC client: sent message: {}", message);
85+
86+
let mut consumer = self
87+
.channel
88+
.basic_consume(
89+
self.reply_to.as_str(),
90+
"",
91+
BasicConsumeOptions::default(),
92+
FieldTable::default(),
93+
)
94+
.await
95+
.map_err(|e| {
96+
format!(
97+
"Failed to create consumer for queue {}: {}",
98+
self.reply_to, e
99+
)
100+
})?;
101+
while let Some(delivery) = consumer.next().await {
102+
let delivery = delivery?;
103+
match delivery.properties.correlation_id() {
104+
Some(x) if x.as_str() == correlation_id => {}
105+
_ => {
106+
error!("RPC client: invalid correlation_id in delivery");
107+
delivery.ack(BasicAckOptions::default()).await?;
108+
continue;
109+
}
110+
}
111+
112+
info!(
113+
"RPC client: received message {}",
114+
std::str::from_utf8(&delivery.data[..])?
115+
);
116+
117+
delivery.ack(BasicAckOptions::default()).await?;
118+
break;
119+
}
120+
121+
Ok(())
122+
}
123+
}
124+
125+
async fn server() -> Result<(), Box<dyn std::error::Error>> {
126+
loop {
127+
let channel = open_channel().await?;
128+
channel
129+
.queue_declare(
130+
"rpc_queue",
131+
QueueDeclareOptions {
132+
durable: true,
133+
..Default::default()
134+
},
135+
FieldTable::default(),
136+
)
137+
.await
138+
.map_err(|e| format!("Failed to declare queue rpc_queue:{e}"))?;
139+
140+
let mut consumer = channel
141+
.basic_consume(
142+
"rpc_queue",
143+
"",
144+
BasicConsumeOptions::default(),
145+
FieldTable::default(),
146+
)
147+
.await
148+
.map_err(|e| format!("Failed to create consumer for queue \"rpc_queue\": {e}"))?;
149+
150+
while let Some(delivery) = consumer.next().await {
151+
let delivery = delivery?;
152+
let received_message = std::str::from_utf8(&delivery.data[..])?;
153+
let send_message = received_message.parse::<i64>()?;
154+
let send_message = send_message + 100;
155+
let send_message = send_message.to_string();
156+
157+
info!("RPC server: received message: {}", received_message);
158+
let correlation_id = match delivery.properties.correlation_id() {
159+
Some(correlation_id) => correlation_id,
160+
_ => {
161+
error!("RPC server: invalid correlation_id in delivery");
162+
delivery.ack(BasicAckOptions::default()).await?;
163+
continue;
164+
}
165+
};
166+
let reply_to = match delivery.properties.reply_to() {
167+
Some(reply_to) => reply_to,
168+
_ => {
169+
error!("RPC server: invalid correlation_id in delivery");
170+
delivery.ack(BasicAckOptions::default()).await?;
171+
continue;
172+
}
173+
};
174+
175+
while {
176+
let properties =
177+
BasicProperties::default().with_correlation_id(correlation_id.clone());
178+
info!(
179+
"RPC server: correlation_id {:?}",
180+
properties.correlation_id()
181+
);
182+
!channel
183+
.basic_publish(
184+
"",
185+
reply_to.as_str(),
186+
BasicPublishOptions::default(),
187+
send_message.as_bytes(),
188+
properties,
189+
)
190+
.await
191+
.map_err(|e| format!("Failed to publish message: {e}"))?
192+
.await
193+
.map_err(|e| format!("Publish confirm error: {e}"))?
194+
.is_ack()
195+
} {
196+
error!("RPC server: did not get ack message, will retry sending it");
197+
sleep(Duration::from_millis(100)).await;
198+
}
199+
info!("RPC server: sent message {}", send_message);
200+
201+
delivery.ack(BasicAckOptions::default()).await?;
202+
break;
203+
}
204+
}
205+
}
206+
207+
#[tokio::main]
208+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
209+
if std::env::var("RUST_LOG").is_err() {
210+
std::env::set_var("RUST_LOG", "info");
211+
}
212+
tracing_subscriber::fmt::init();
213+
214+
// Delete the queue
215+
// before starting the server
216+
open_channel()
217+
.await
218+
.map_err(|e| format!("Failed to open channel: {e}"))?
219+
.queue_delete("rpc_queue", QueueDeleteOptions::default())
220+
.await
221+
.map_err(|e| format!("Failed to delete queue rpc_queue: {e}"))?;
222+
223+
// In practice, the server and client could run
224+
// on distinct machines
225+
tokio::join!(
226+
async {
227+
loop {
228+
if let Err(e) = server().await {
229+
println!("Error in server: {e}.");
230+
}
231+
println!("Restarting server in 1 second.");
232+
sleep(Duration::from_secs(1)).await;
233+
}
234+
},
235+
async {
236+
// Create a client instance
237+
let mut client = loop {
238+
let client = RpcClient::try_new().await;
239+
match client {
240+
Ok(client) => break client,
241+
Err(e) => {
242+
error!("Error creating client: {e}.");
243+
sleep(Duration::from_secs(1)).await;
244+
}
245+
}
246+
};
247+
248+
// Send rpc requests and wait for the response
249+
for i in 0.. {
250+
match client.rpc_call(i).await {
251+
Ok(_) => {
252+
info!("Request successful");
253+
}
254+
Err(e) => {
255+
error!("Client error while sending request: {e}.");
256+
}
257+
}
258+
info!("Sending new request in 10 second.");
259+
sleep(Duration::from_secs(10)).await;
260+
}
261+
},
262+
);
263+
264+
Ok(())
265+
}

0 commit comments

Comments
 (0)