Skip to content

Commit 7f14cd0

Browse files
committed
1. add cn.abelib.jodis.utils.Logger; 2. add 1:1 nio reactor server
1 parent cc26f0c commit 7f14cd0

28 files changed

+1426
-38
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,6 @@ target
2828

2929
log/*
3030

31-
.DS_Store
31+
.DS_Store
32+
33+
conf/*
Lines changed: 64 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,86 @@
11
package cn.abelib.jodis;
22

3-
import cn.abelib.jodis.impl.JodisDb;
4-
import cn.abelib.jodis.protocol.Request;
5-
import cn.abelib.jodis.protocol.Response;
3+
import cn.abelib.jodis.server.JodisConfig;
4+
import cn.abelib.jodis.server.JodisServer;
5+
import cn.abelib.jodis.utils.Logger;
6+
import cn.abelib.jodis.utils.PropertiesUtils;
7+
8+
import java.io.Closeable;
9+
import java.nio.file.Files;
10+
import java.nio.file.Path;
11+
import java.nio.file.Paths;
12+
import java.util.Properties;
613

7-
import java.io.IOException;
814

915
/**
1016
* @Author: abel.huang
1117
* @Date: 2020-07-06 23:51
1218
* Java Object Dictionary Server
1319
*/
14-
public class Jodis {
15-
private JodisDb jodisDb;
20+
public class Jodis implements Closeable {
21+
Logger log = Logger.getLogger(Jodis.class);
22+
23+
/**
24+
* 关闭钩子
25+
*/
26+
private volatile Thread shutdownHook;
27+
28+
private JodisServer jodisServer;
1629

1730
public Jodis() {}
1831

19-
public Jodis(JodisConfig config) throws IOException {
20-
jodisDb = new JodisDb();
32+
public void start(String propsFileName) {
33+
Path path = Paths.get(propsFileName);
34+
if (!Files.exists(path) || !Files.isRegularFile(path)) {
35+
log.error( "ERROR: Jodis config file not exist => '{}', copy one from 'conf/jodis.properties' first.",
36+
path.toAbsolutePath().toString());
37+
System.exit(-1);
38+
}
39+
40+
start(PropertiesUtils.loadProps(propsFileName));
2141
}
2242

23-
public static Jodis create() {
24-
return new Jodis();
43+
public void start(Properties mainProperties) {
44+
final JodisConfig config = new JodisConfig(mainProperties);
45+
start(config);
46+
}
47+
48+
public void start(JodisConfig config) {
49+
jodisServer = new JodisServer(config);
50+
51+
// todo Executor
52+
shutdownHook = new Thread(() -> {
53+
jodisServer.close();
54+
jodisServer.awaitShutdown();
55+
});
56+
Runtime.getRuntime().addShutdownHook(shutdownHook);
57+
58+
jodisServer.startup();
2559
}
2660

27-
public static Jodis create(JodisConfig config) throws IOException {
28-
return new Jodis(config);
61+
public void awaitShutdown() {
62+
if (jodisServer != null) {
63+
jodisServer.awaitShutdown();
64+
}
2965
}
3066

31-
public String process(String request) throws IOException {
32-
return jodisDb.execute(request).toRespString();
67+
@Override
68+
public void close() {
69+
if (shutdownHook != null) {
70+
try {
71+
Runtime.getRuntime().removeShutdownHook(shutdownHook);
72+
} catch (IllegalStateException e) {
73+
//ignore shutting down status
74+
}
75+
shutdownHook.run();
76+
shutdownHook = null;
77+
}
3378
}
3479

35-
public Response process(Request request) throws IOException {
36-
return jodisDb.execute(request);
80+
public static void main(String[] args) {
81+
Jodis jodis = new Jodis();
82+
jodis.start("conf/jodis.properties");
83+
jodis.awaitShutdown();
84+
jodis.close();
3785
}
3886
}

src/main/java/cn/abelib/jodis/JodisConfig.java

Lines changed: 0 additions & 19 deletions
This file was deleted.
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
########## Jodis Properties ##########
2+
3+
4+
# jodis server port listen on
5+
jodis.port=6059
6+
7+
# persistence log directory
8+
log.dir=log/
9+
10+
# JDB log file name
11+
log.jdb=default.jdb
12+
13+
# AOF log file name
14+
log.aof=default.aof
15+

src/main/java/cn/abelib/jodis/impl/JodisDb.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
package cn.abelib.jodis.impl;
22

33
import cn.abelib.jodis.log.AofWriter;
4+
import cn.abelib.jodis.log.JdbReader;
5+
import cn.abelib.jodis.log.JdbWriter;
46
import cn.abelib.jodis.protocol.ProtocolConstant;
57
import cn.abelib.jodis.protocol.ErrorResponse;
68
import cn.abelib.jodis.protocol.Request;
79
import cn.abelib.jodis.protocol.Response;
810
import cn.abelib.jodis.impl.executor.ExecutorFactory;
11+
import cn.abelib.jodis.server.JodisConfig;
12+
import cn.abelib.jodis.server.JodisException;
913
import cn.abelib.jodis.utils.StringUtils;
1014

1115
import java.io.IOException;
@@ -38,16 +42,36 @@ public class JodisDb {
3842
* 请求队列
3943
*/
4044
private List<Request> requestQueue;
45+
46+
/**
47+
* todo
48+
*/
49+
private JdbReader jdbReader;
50+
51+
private JdbWriter jdbWriter;
52+
4153
/**
4254
* 是否正在进行Aof文件重写
4355
*/
4456
private AtomicBoolean rewriteAof;
4557

58+
public JodisDb(JodisConfig jodisConfig) throws IOException {
59+
jodisCollection = new ConcurrentHashMap<>();
60+
executorFactory = new ExecutorFactory(this);
61+
aofWriter = new AofWriter(jodisConfig.getLogDir(), jodisConfig.getLogWal());
62+
63+
requestQueue = new ArrayList<>(10);
64+
rewriteAof = new AtomicBoolean(false);
65+
}
66+
67+
/**
68+
* default for test
69+
* todo
70+
* @throws IOException
71+
*/
4672
public JodisDb() throws IOException {
4773
jodisCollection = new ConcurrentHashMap<>();
4874
executorFactory = new ExecutorFactory(this);
49-
// todo 配置
50-
aofWriter = new AofWriter("", "");
5175
requestQueue = new ArrayList<>(10);
5276
rewriteAof = new AtomicBoolean(false);
5377
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package cn.abelib.jodis.network;
2+
3+
import cn.abelib.jodis.utils.Closeables;
4+
import cn.abelib.jodis.utils.Logger;
5+
6+
import java.io.Closeable;
7+
import java.io.IOException;
8+
import java.nio.channels.Selector;
9+
import java.util.concurrent.CountDownLatch;
10+
import java.util.concurrent.atomic.AtomicBoolean;
11+
12+
/**
13+
* @Author: abel.huang
14+
* @Date: 2020-08-02 18:58
15+
*/
16+
public abstract class AbstractServerThread implements Runnable, Closeable {
17+
private Selector selector;
18+
protected final CountDownLatch startupLatch = new CountDownLatch(1);
19+
protected final CountDownLatch shutdownLatch = new CountDownLatch(1);
20+
protected final AtomicBoolean alive = new AtomicBoolean(false);
21+
22+
final Logger logger = Logger.getLogger(getClass());
23+
24+
/**
25+
* @return the selector
26+
*/
27+
public Selector getSelector() {
28+
if (selector == null) {
29+
try {
30+
selector = Selector.open();
31+
} catch (IOException e) {
32+
throw new RuntimeException(e);
33+
}
34+
}
35+
return selector;
36+
}
37+
38+
protected void closeSelector() {
39+
Closeables.closeQuietly(selector);
40+
}
41+
42+
@Override
43+
public void close() throws IOException {
44+
alive.set(false);
45+
selector.wakeup();
46+
try {
47+
shutdownLatch.await();
48+
} catch (InterruptedException e) {
49+
logger.error(e);
50+
}
51+
}
52+
53+
protected void startupComplete() {
54+
alive.set(true);
55+
startupLatch.countDown();
56+
}
57+
58+
protected void shutdownComplete() {
59+
shutdownLatch.countDown();
60+
}
61+
62+
protected boolean isRunning() {
63+
return alive.get();
64+
}
65+
66+
public void awaitStartup() throws InterruptedException {
67+
startupLatch.await();
68+
}
69+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package cn.abelib.jodis.network;
2+
3+
4+
import cn.abelib.jodis.utils.Closeables;
5+
import com.google.common.base.Throwables;
6+
7+
import java.io.IOException;
8+
import java.net.InetSocketAddress;
9+
import java.nio.channels.SelectionKey;
10+
import java.nio.channels.ServerSocketChannel;
11+
import java.nio.channels.SocketChannel;
12+
import java.util.Iterator;
13+
14+
/**
15+
* @Author: abel.huang
16+
* @Date: 2020-07-30 22:41
17+
*/
18+
public class Accepter extends AbstractServerThread {
19+
private int port;
20+
private Processor processor;
21+
22+
public Accepter(int port, Processor processor) {
23+
this.port = port;
24+
this.processor = processor;
25+
}
26+
27+
@Override
28+
public void run() {
29+
ServerSocketChannel serverSocketChannel = null;
30+
31+
try {
32+
serverSocketChannel = ServerSocketChannel.open();
33+
// 设置非阻塞
34+
serverSocketChannel.configureBlocking(false);
35+
serverSocketChannel.socket().bind(new InetSocketAddress(port));
36+
serverSocketChannel.register(getSelector(), SelectionKey.OP_ACCEPT);
37+
} catch (IOException e) {
38+
logger.error(e);
39+
}
40+
41+
logger.info("waiting connection on port: {}", port);
42+
43+
startupComplete();
44+
45+
while(isRunning()) {
46+
int ready;
47+
try {
48+
ready = getSelector().select(500L);
49+
} catch (IOException e) {
50+
throw new IllegalStateException(e);
51+
}
52+
if(ready <= 0) {
53+
continue;
54+
}
55+
Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
56+
while(iter.hasNext() && isRunning()) {
57+
try {
58+
SelectionKey key = iter.next();
59+
iter.remove();
60+
//
61+
if(key.isAcceptable()) {
62+
accept(key, processor);
63+
}else {
64+
throw new IllegalStateException("Unrecognized key state for acceptor thread.");
65+
}
66+
} catch (Throwable t) {
67+
logger.error("Error in acceptor {}", Throwables.getStackTraceAsString(t));
68+
}
69+
}
70+
}
71+
//run over
72+
logger.info("Closing server socket and selector.");
73+
Closeables.closeQuietly(serverSocketChannel);
74+
Closeables.closeQuietly(getSelector());
75+
shutdownComplete();
76+
}
77+
78+
private void accept(SelectionKey key, Processor processor) throws IOException {
79+
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
80+
serverSocketChannel.socket().setReceiveBufferSize(1024 * 1024);
81+
82+
SocketChannel socketChannel = serverSocketChannel.accept();
83+
socketChannel.configureBlocking(false);
84+
socketChannel.socket().setTcpNoDelay(true);
85+
socketChannel.socket().setSendBufferSize(1024 * 1024);
86+
87+
processor.accept(socketChannel);
88+
}
89+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package cn.abelib.jodis.network;
2+
3+
/**
4+
* @Author: abel.huang
5+
* @Date: 2020-08-03 00:13
6+
*/
7+
public class InvalidRequestException extends RuntimeException {
8+
private static final long serialVersionUID = 1L;
9+
10+
public InvalidRequestException() {
11+
super();
12+
}
13+
14+
public InvalidRequestException(String message, Throwable cause) {
15+
super(message, cause);
16+
}
17+
18+
public InvalidRequestException(String message) {
19+
super(message);
20+
}
21+
22+
public InvalidRequestException(Throwable cause) {
23+
super(cause);
24+
}
25+
}

0 commit comments

Comments
 (0)