Skip to content

Commit eb5e95b

Browse files
authored
Merge pull request apache#20 from chensj2018/8a-jdbc-sink
8a jdbc sink
2 parents ba907e5 + 2fdb301 commit eb5e95b

File tree

7 files changed

+221
-46
lines changed

7 files changed

+221
-46
lines changed

flume-ng-core/src/main/java/org/apache/flume/interceptor/HeaderSplitInterceptor.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -122,28 +122,26 @@ public void close() {
122122
@Override
123123
public Event intercept(Event event) {
124124
Map<String, String> headers = event.getHeaders();
125-
125+
126126
if (!headers.containsKey(splitKey)) {
127127
logger.error("{} is not in header", splitKey);
128128
return event;
129129
}
130-
130+
131131
Matcher matcher = regex.matcher(headers.get(splitKey));
132132
if (matcher.find()) {
133133
for (int group = 0, count = matcher.groupCount(); group < count; group++) {
134134
int groupIndex = group + 1;
135135
if (groupIndex > serializers.size()) {
136136
if (logger.isDebugEnabled()) {
137-
logger.debug("Skipping group {} to {} due to missing serializer",
138-
group, count);
137+
logger.debug("Skipping group {} to {} due to missing serializer", group, count);
139138
}
140139
break;
141140
}
142-
headers.put(serializers.get(group),matcher.group(groupIndex));
141+
headers.put(serializers.get(group), matcher.group(groupIndex));
143142
}
144-
}
145-
else {
146-
logger.warn("no mathch any values");
143+
} else {
144+
logger.warn("no mathch any values");
147145
}
148146
return event;
149147
}

flume-ng-core/src/main/java/org/apache/flume/sink/gbase/GBase8aSink.java

Lines changed: 112 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,20 @@ Licensed to the Apache Software Foundation (ASF) under one or more
1818

1919
package org.apache.flume.sink.gbase;
2020

21+
import java.net.InetAddress;
22+
import java.net.UnknownHostException;
23+
import java.sql.Connection;
24+
import java.sql.DriverManager;
25+
import java.sql.SQLException;
26+
import java.sql.Statement;
27+
2128
import org.apache.flume.Channel;
2229
import org.apache.flume.Context;
2330
import org.apache.flume.CounterGroup;
2431
import org.apache.flume.EventDeliveryException;
2532
import org.apache.flume.conf.Configurable;
2633
import org.apache.flume.sink.AbstractSink;
34+
import org.apache.flume.source.http.HTTPSourceConfigurationConstants;
2735
import org.slf4j.Logger;
2836
import org.slf4j.LoggerFactory;
2937

@@ -39,9 +47,16 @@ public class GBase8aSink extends AbstractSink implements Configurable {
3947
private static final Logger logger = LoggerFactory.getLogger(GBase8aSink.class);
4048

4149
private PassiveHttpSink httpSink;
50+
private String connectUrl;
51+
private String driverClassName;
52+
private String userName;
53+
private String passWord;
54+
private String loadSql;
55+
private int loadInterval = 0;
56+
private Connection conn = null;
57+
private Statement stm = null;
4258

4359
private CounterGroup counterGroup;
44-
private int batchSize;
4560

4661
public GBase8aSink() {
4762
counterGroup = new CounterGroup();
@@ -52,53 +67,135 @@ public GBase8aSink() {
5267
public void configure(Context context) {
5368
httpSink.configure(context);
5469

55-
batchSize = context.getInteger(GBase8aSinkConstants.BATCH_SIZE,
56-
GBase8aSinkConstants.DFLT_BATCH_SIZE);
57-
logger.debug(this.getName() + " " + "batch size set to " + String.valueOf(batchSize));
58-
Preconditions.checkArgument(batchSize > 0, "Batch size must be > 0");
70+
connectUrl = context.getString(GBase8aSinkConstants.CONNECTION_STRING);
71+
Preconditions.checkArgument(connectUrl != null && connectUrl.trim().length() != 0,
72+
"connect url must be a string");
73+
74+
userName = context.getString(GBase8aSinkConstants.CONNECTION_USERNAME);
75+
passWord = context.getString(GBase8aSinkConstants.CONNECTION_PASSWORD);
76+
driverClassName = context.getString(GBase8aSinkConstants.CONNECTION_DRIVER_CLASS,
77+
GBase8aSinkConstants.DFLT_DRIVER_CLASS);
78+
79+
loadInterval = context.getInteger(GBase8aSinkConstants.LOAD_INTERVAL,
80+
GBase8aSinkConstants.DFLT_LOAD_INTERVAL);
81+
Preconditions.checkArgument(loadInterval >= 0 && loadInterval < 30,
82+
"loadInterval must be in [0,30).");
83+
84+
loadSql = context.getString(GBase8aSinkConstants.SQL_STRING);
85+
Preconditions.checkArgument(loadSql != null && loadSql.trim().length() != 0,
86+
"load sql must be a string");
87+
try {
88+
Integer port = context.getInteger(HTTPSourceConfigurationConstants.CONFIG_PORT);
89+
/* 获取本机hostname对应的IP */
90+
String IP = InetAddress.getLocalHost().getHostAddress();
91+
logger.info("local ip : {}, port : {}", IP, port);
92+
loadSql = loadSql.replaceAll("\\$\\{localhost\\}", IP + ":" + port);
93+
} catch (UnknownHostException e) {
94+
logger.error("Error while get local IP. Exception follows.", e);
95+
}
5996
}
6097

6198
@Override
6299
public Status process() throws EventDeliveryException {
63100
Status status = Status.READY;
64-
65-
/*
66-
* TODO 等待合适时间通过 jdbc 通知 8a 来读取数据
67-
*/
101+
102+
try {
103+
stm.execute(loadSql);
104+
if (stm.getUpdateCount() == 0) {
105+
Thread.sleep(loadInterval * 1000);
106+
}
107+
} catch (Exception e) {
108+
logger.error("Error while execute GBase8a. Exception follows.", e);
109+
reconnectGBase8a();
110+
status = Status.BACKOFF;
111+
}
68112

69113
return status;
70114
}
71115

116+
/*
117+
* (non-Javadoc)
118+
*
119+
* @see java.lang.Object#toString()
120+
*/
121+
@Override
122+
public String toString() {
123+
return "GBase8aSink [httpSink=" + httpSink + ", connectUrl=" + connectUrl + ", driverClassName="
124+
+ driverClassName + ", userName=" + userName + ", passWord=" + passWord + ", loadSql="
125+
+ loadSql + ", conn=" + conn + ", stm=" + stm + ", counterGroup=" + counterGroup + "]";
126+
}
127+
128+
/**
129+
* connect to GBase 8a MPP cluster.
130+
*
131+
* @author chensj
132+
*/
133+
private void connectGBase8a() {
134+
try {
135+
Class.forName(driverClassName);
136+
conn = DriverManager.getConnection(connectUrl, userName, passWord);
137+
stm = conn.createStatement();
138+
logger.info("connected GBase8a.");
139+
} catch (ClassNotFoundException e) {
140+
logger.error("Error while Connecting GBase8a. Exception follows.", e);
141+
} catch (SQLException e) {
142+
logger.error("Error while getConnection GBase8a. Exception follows.", e);
143+
}
144+
}
145+
146+
private void reconnectGBase8a() {
147+
disconnectGBase8a();
148+
connectGBase8a();
149+
}
150+
72151
@Override
73152
public void start() {
74153
logger.info("Starting {}...", this);
75154

76155
counterGroup.setName(this.getName());
77-
super.start();
156+
connectGBase8a();
78157
httpSink.start();
79-
158+
super.start();
80159
logger.info("GBase 8a sink {} started.", getName());
81160
}
82161

83162
@Override
84163
public void stop() {
85164
logger.info("GBase 8a sink {} stopping...", getName());
165+
disconnectGBase8a();
86166

87167
httpSink.stop();
88168
super.stop();
89169

90170
logger.info("GBase 8a sink {} stopped. Event metrics: {}", getName(), counterGroup);
91171
}
92172

93-
@Override
94-
public String toString() {
95-
return "GBase8a " + getName() + " { batchSize: " + batchSize + " }";
173+
/**
174+
*
175+
*/
176+
private void disconnectGBase8a() {
177+
if (stm != null) {
178+
try {
179+
if (!stm.isClosed()) {
180+
stm.close();
181+
}
182+
} catch (Exception e) {
183+
logger.error("Error while close stm. Exception follows.", e);
184+
}
185+
if (conn != null) {
186+
try {
187+
conn.close();
188+
} catch (SQLException e) {
189+
logger.error("Error while close connect. Exception follows.", e);
190+
}
191+
}
192+
}
193+
96194
}
97195

98196
@Override
99197
public synchronized void setChannel(Channel channel) {
100198
httpSink.setChannel(channel);
101199
super.setChannel(channel);
102200
}
103-
104201
}

flume-ng-core/src/main/java/org/apache/flume/sink/gbase/GBase8aSinkConstants.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,17 @@ public class GBase8aSinkConstants {
3030

3131
public static final String GBASE8A_PREFIX = "gbase8a.";
3232

33-
/* 给 8a 发送 load data 指令的时间间隔 (单位: 秒) */
34-
public static final String INTERVAL = "interval";
35-
3633
/* 8a 连接参数 */
3734
public static final String CONNECTION_STRING = GBASE8A_PREFIX + "jdbcUrl";
3835
public static final String CONNECTION_USERNAME = GBASE8A_PREFIX + "username";
3936
public static final String CONNECTION_PASSWORD = GBASE8A_PREFIX + "password";
40-
41-
/* 8a 加载语句 (直接设置 SQL, 同时支持自动替换 PassiveHttpSink 服务路径 ${URI}) */
37+
public static final String CONNECTION_DRIVER_CLASS = GBASE8A_PREFIX + "driverClass";
38+
39+
/* 8a 加载语句 (直接设置 SQL, 同时支持自动替换 ${localhost}为本机ip+http服务的port) */
4240
public static final String SQL_STRING = GBASE8A_PREFIX + "loadSql";
43-
41+
/* 8a 加载语句的执行间隔(单位:秒);只有在flume channel空闲是有效*/
42+
public static final String LOAD_INTERVAL = GBASE8A_PREFIX + "loadInterval";
43+
4444
/* handler 参数 (设置时需要加 "handler." 前缀) */
4545
public static final String BATCH_SIZE = "batchSize";
4646
public static final String CHARACTER_ENCODING = "characterEncoding";
@@ -52,6 +52,8 @@ public class GBase8aSinkConstants {
5252
/* 参数默认值 */
5353
public static final int DFLT_INTERVAL = 10000; // ms
5454
public static final String DFLT_HANDLER = "org.apache.flume.sink.gbase.PassiveHttpSinkBlobHandler";
55+
public static final String DFLT_DRIVER_CLASS = "com.gbase.jdbc.Driver";
56+
public static final int DFLT_LOAD_INTERVAL = 5; // s
5557

5658
/* handler 参数默认值 */
5759
public static final int DFLT_BATCH_SIZE = 10000; // events

flume-ng-core/src/main/java/org/apache/flume/sink/gbase/PassiveHttpSink.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ Licensed to the Apache Software Foundation (ASF) under one or more
2525
import java.util.List;
2626
import java.util.Map;
2727

28+
import javax.servlet.ServletException;
2829
import javax.servlet.http.HttpServlet;
2930
import javax.servlet.http.HttpServletRequest;
3031
import javax.servlet.http.HttpServletResponse;
@@ -266,6 +267,26 @@ public void doPost(HttpServletRequest request, HttpServletResponse response)
266267
public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
267268
doPost(request, response);
268269
}
270+
271+
@Override
272+
protected void doHead(HttpServletRequest request, HttpServletResponse response)
273+
throws ServletException, IOException {
274+
try {
275+
handler.handle(request, response);
276+
} catch (HTTPBadRequestException ex) {
277+
LOG.warn("Received bad request from client. ", ex);
278+
response.sendError(HttpServletResponse.SC_BAD_REQUEST,
279+
"Bad request from client. " + ex.getMessage());
280+
return;
281+
} catch (Exception ex) {
282+
LOG.warn("Unexpected error while sending events. ", ex);
283+
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
284+
"Unexpected error while sending events. " + ex.getMessage());
285+
return;
286+
}
287+
288+
response.flushBuffer();
289+
}
269290
}
270291

271292
@Override

0 commit comments

Comments
 (0)