Skip to content

Commit 8f08240

Browse files
authored
fix storage (#32)
Signed-off-by: dierbei <[email protected]>
1 parent 85ec6c2 commit 8f08240

File tree

7 files changed

+380
-192
lines changed

7 files changed

+380
-192
lines changed

examples/simple_run.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ use slog::{error, info};
99
use std::collections::HashMap;
1010
use std::net::SocketAddr;
1111
use std::str::FromStr;
12-
use std::thread;
13-
use tokio::runtime::Runtime;
1412
use tokio::time::Duration;
1513

1614
use raft_rs::network::{NetworkLayer, TCPManager};
@@ -34,7 +32,7 @@ async fn main() {
3432
.clone()
3533
.iter()
3634
.map(|n| ServerConfig {
37-
election_timeout: Duration::from_millis(200),
35+
election_timeout: Duration::from_millis(1000),
3836
address: n.address,
3937
default_leader: Some(1u32),
4038
leadership_preferences: HashMap::new(),
@@ -47,20 +45,18 @@ async fn main() {
4745
for (i, config) in configs.into_iter().enumerate() {
4846
let id = cluster_nodes[i];
4947
let cc = cluster_config.clone();
50-
handles.push(thread::spawn(move || {
51-
let rt = Runtime::new().unwrap();
52-
let mut server = Server::new(id, config, cc);
53-
rt.block_on(server.start());
48+
handles.push(tokio::spawn(async move {
49+
let mut server = Server::new(id, config, cc).await;
50+
server.start().await;
5451
}));
5552
}
5653

5754
// Simulate a client request after some delay
5855
tokio::time::sleep(Duration::from_secs(20)).await;
5956
client_request(1, 42u32).await;
6057
tokio::time::sleep(Duration::from_secs(2)).await;
61-
// Join all server threads
6258
for handle in handles {
63-
handle.join().unwrap();
59+
handle.await.unwrap();
6460
}
6561
}
6662

examples/simulate_add_node.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ use slog::error;
77
use std::collections::HashMap;
88
use std::net::SocketAddr;
99
use std::str::FromStr;
10-
use std::thread;
11-
use tokio::runtime::Runtime;
1210
use tokio::time::Duration;
1311

1412
use raft_rs::network::{NetworkLayer, TCPManager};
@@ -17,15 +15,15 @@ use raft_rs::server::{Server, ServerConfig};
1715
#[tokio::main]
1816
async fn main() {
1917
// Define cluster configuration
20-
let mut cluster_nodes = vec![1, 2, 3, 4, 5];
18+
let cluster_nodes = vec![1, 2, 3, 4, 5];
2119
let peers = vec![
2220
NodeMeta::from((1, SocketAddr::from_str("127.0.0.1:5001").unwrap())),
2321
NodeMeta::from((2, SocketAddr::from_str("127.0.0.1:5002").unwrap())),
2422
NodeMeta::from((3, SocketAddr::from_str("127.0.0.1:5003").unwrap())),
2523
NodeMeta::from((4, SocketAddr::from_str("127.0.0.1:5004").unwrap())),
2624
NodeMeta::from((5, SocketAddr::from_str("127.0.0.1:5005").unwrap())),
2725
];
28-
let mut cluster_config = ClusterConfig::new(peers.clone());
26+
let cluster_config = ClusterConfig::new(peers.clone());
2927
// Create server configs
3028
let configs: Vec<_> = peers
3129
.clone()
@@ -44,10 +42,9 @@ async fn main() {
4442
for (i, config) in configs.into_iter().enumerate() {
4543
let id = cluster_nodes[i];
4644
let cc = cluster_config.clone();
47-
handles.push(thread::spawn(move || {
48-
let rt = Runtime::new().unwrap();
49-
let mut server = Server::new(id, config, cc);
50-
rt.block_on(server.start());
45+
handles.push(tokio::spawn(async move {
46+
let mut server = Server::new(id, config, cc).await;
47+
server.start().await;
5148
}));
5249
}
5350

@@ -66,28 +63,25 @@ async fn main() {
6663
};
6764

6865
// Launching a new node
69-
handles.push(thread::spawn(move || {
70-
let rt = Runtime::new().unwrap();
71-
let mut server = Server::new(new_node_id, new_node_conf, cluster_config);
72-
rt.block_on(server.start());
66+
handles.push(tokio::spawn(async move {
67+
let mut server = Server::new(new_node_id, new_node_conf, cluster_config).await;
68+
server.start().await;
7369
}));
7470

7571
// Simulate sending a Raft Join request after a few seconds
7672
// Because we need to wait until the new node has started
7773
tokio::time::sleep(Duration::from_secs(3)).await;
7874
add_node_request(new_node_id, new_node_address).await;
7975

80-
// Wait for all servers to finish
8176
for handle in handles {
82-
handle.join().unwrap();
77+
handle.await.unwrap();
8378
}
8479
}
8580

8681
async fn add_node_request(new_node_id: u32, addr: SocketAddr) {
8782
let log = get_logger();
8883

89-
let server_address = addr;
90-
let network_manager = TCPManager::new(server_address);
84+
let network_manager = TCPManager::new(addr);
9185

9286
let request_data = vec![
9387
new_node_id.to_be_bytes().to_vec(),
@@ -98,7 +92,13 @@ async fn add_node_request(new_node_id: u32, addr: SocketAddr) {
9892
.concat();
9993

10094
// Let's assume that 5001 is the port of the leader node.
101-
if let Err(e) = network_manager.send(&server_address, &request_data).await {
95+
if let Err(e) = network_manager
96+
.send(
97+
&SocketAddr::from_str("127.0.0.1:5001").unwrap(),
98+
&request_data,
99+
)
100+
.await
101+
{
102102
error!(log, "Failed to send client request: {}", e);
103103
}
104104
}

examples/simulate_node_failure.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ async fn main() {
4545
let id = cluster_nodes[i];
4646
let cc = cluster_config.clone();
4747
let server_handle = tokio::spawn(async move {
48-
let mut server = Server::new(id, config, cc);
48+
let mut server = Server::new(id, config, cc).await;
4949
server.start().await;
5050
});
5151
server_handles.push(server_handle);
@@ -77,7 +77,7 @@ async fn main() {
7777
};
7878
let cc = cluster_config.clone();
7979
let server_handle = tokio::spawn(async move {
80-
let mut server = Server::new(server_to_stop.try_into().unwrap(), config, cc);
80+
let mut server = Server::new(server_to_stop.try_into().unwrap(), config, cc).await;
8181
server.start().await;
8282
});
8383
server_handles[server_to_stop - 1] = server_handle;

src/error.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ pub enum NetworkError {
3333
ConnectError(SocketAddr),
3434
#[error("Failed binding to {0}")]
3535
BindError(SocketAddr),
36-
#[error("Broadcast failed")]
37-
BroadcastError,
36+
#[error("Broadcast failed, errmsg: {0}")]
37+
BroadcastError(String),
3838
}
3939

4040
#[derive(Error, Debug)]

src/network.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ impl NetworkLayer for TCPManager {
7979
.into_iter()
8080
.collect::<std::result::Result<_, _>>()
8181
// FIXME: We should let client decide what to do with the errors
82-
.map_err(|_e| NetworkError::BroadcastError)?;
82+
.map_err(|e| NetworkError::BroadcastError(e.to_string()))?;
8383
Ok(())
8484
}
8585

0 commit comments

Comments
 (0)