Skip to content

Commit 017d99a

Browse files
committed
simulate replica repair but need to work more on it
1 parent 767555e commit 017d99a

File tree

2 files changed

+109
-4
lines changed

2 files changed

+109
-4
lines changed

README.md

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Raft-rs
22
An understandable, fast, scalable and optimized implementation of [Raft algoritm](https://en.wikipedia.org/wiki/Raft_(algorithm)).
3-
It is asynchronous(built on tokio runtime) and supports zero-copy. It does not assume storage to be non-malicious, if corrupted, it will repair the logs via peer-to-peer communication.
3+
It is asynchronous(built on tokio runtime) and supports zero-copy. It does not assume storage to be non-malicious, if corrupted, it will repair the logs via peer-to-peer communication.
44

55
## Note
66
- This project is still under development and is not yet production-ready. It is not recommended to use this in production environments.
@@ -15,7 +15,7 @@ It is asynchronous(built on tokio runtime) and supports zero-copy. It does not a
1515
- [x] Scalable
1616
- [x] Zero-Copy support
1717
- [x] Asynchronous
18-
- [x] Default Leader
18+
- [x] Default Leader
1919
- [x] Leadership preference
2020
- [x] Log compaction
2121
- [x] Tigerbeetle style replica repair
@@ -25,10 +25,9 @@ It is asynchronous(built on tokio runtime) and supports zero-copy. It does not a
2525
## To-Do
2626
- [ ] Production-ready
2727
- [ ] Test replica repair thoroughly
28-
- [ ] io_uring support
28+
- [ ] io_uring support for linux
2929
- [ ] Complete batch write implementation
3030
- [ ] Improve Log compaction
31-
- [ ] Improve error handling
3231
- [ ] RDMA support
3332
- [ ] Add more comprehensive tests
3433
- [ ] Enhance documentation

examples/simulate_replica_repair.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// Organization: SpacewalkHq
2+
// License: MIT License
3+
4+
// We create a cluster of 5 nodes and simulate different scenarios of storage failure and recovery.
5+
6+
use raft_rs::cluster::{ClusterConfig, NodeMeta};
7+
use raft_rs::log::get_logger;
8+
use raft_rs::server::{Server, ServerConfig};
9+
use rand::Rng;
10+
use slog::{info, warn};
11+
use std::collections::HashMap;
12+
use std::fs;
13+
use std::net::SocketAddr;
14+
use std::str::FromStr;
15+
use tokio::time::{sleep, Duration};
16+
17+
#[tokio::main]
18+
async fn main() {
19+
let log = get_logger();
20+
21+
// Define cluster configuration
22+
let cluster_nodes = vec![1, 2, 3, 4, 5];
23+
let peers = vec![
24+
NodeMeta::from((1, SocketAddr::from_str("127.0.0.1:5001").unwrap())),
25+
NodeMeta::from((2, SocketAddr::from_str("127.0.0.1:5002").unwrap())),
26+
NodeMeta::from((3, SocketAddr::from_str("127.0.0.1:5003").unwrap())),
27+
NodeMeta::from((4, SocketAddr::from_str("127.0.0.1:5004").unwrap())),
28+
NodeMeta::from((5, SocketAddr::from_str("127.0.0.1:5005").unwrap())),
29+
];
30+
let cluster_config = ClusterConfig::new(peers.clone());
31+
32+
// Create server configs
33+
let configs: Vec<_> = peers
34+
.clone()
35+
.iter()
36+
.map(|n| ServerConfig {
37+
election_timeout: Duration::from_millis(200),
38+
address: n.address,
39+
default_leader: Some(1),
40+
leadership_preferences: HashMap::new(),
41+
storage_location: Some("logs/".to_string()),
42+
})
43+
.collect();
44+
45+
// Start servers asynchronously
46+
let mut server_handles = vec![];
47+
for (i, config) in configs.into_iter().enumerate() {
48+
let id = cluster_nodes[i];
49+
let cc = cluster_config.clone();
50+
let server_handle = tokio::spawn(async move {
51+
// Simulate storage corruption when starting up
52+
let storage_location = "logs".to_string();
53+
let corrupted = rand::thread_rng().gen_bool(0.3); // 30% chance of corruption
54+
if corrupted {
55+
fs::create_dir_all(&storage_location).unwrap();
56+
fs::write(format!("{}server_{}.log", storage_location, id), b"").unwrap(); // Simulate corruption
57+
warn!(get_logger(), "Storage for server {} is corrupted", id);
58+
}
59+
60+
let mut server = Server::new(id, config, cc).await;
61+
server.start().await;
62+
});
63+
server_handles.push(server_handle);
64+
}
65+
info!(log, "Cluster is up and running");
66+
67+
// Simulate a random storage failure and recovery while servers are running
68+
let mut rng = rand::thread_rng();
69+
for _ in 0..10 {
70+
let sleep_time = rng.gen_range(3..=5);
71+
sleep(Duration::from_secs(sleep_time)).await;
72+
73+
let server_to_fail = rng.gen_range(1..=5);
74+
warn!(log, "Simulating storage corruption for server {}", server_to_fail);
75+
76+
// Simulate storage corruption on a running server
77+
let storage_path = format!("logs/");
78+
fs::create_dir_all(&storage_path).unwrap();
79+
fs::write(format!("{}server_{}.log", storage_path, server_to_fail), b"").unwrap(); // Simulate corruption
80+
81+
// Restart the corrupted server to simulate recovery
82+
let cc: ClusterConfig = cluster_config.clone();
83+
let server_handle = tokio::spawn(async move {
84+
let config = ServerConfig {
85+
election_timeout: Duration::from_millis(200),
86+
address: SocketAddr::from_str(format!("127.0.0.1:{}", 5000 + server_to_fail).as_str()).unwrap(),
87+
default_leader: Some(1),
88+
leadership_preferences: HashMap::new(),
89+
storage_location: Some(storage_path.clone()),
90+
};
91+
let mut server = Server::new(server_to_fail.try_into().unwrap(), config, cc).await;
92+
server.start().await;
93+
// Handle recovery of corrupted storage
94+
info!(get_logger(), "Server {} has recovered from storage corruption", server_to_fail);
95+
});
96+
97+
server_handles[server_to_fail - 1] = server_handle;
98+
}
99+
100+
// Wait for all server tasks to complete (if they haven't been aborted)
101+
for handle in server_handles {
102+
let _ = handle.await;
103+
}
104+
105+
info!(log, "Test completed successfully.");
106+
}

0 commit comments

Comments
 (0)