Skip to content

Commit 1cf7200

Browse files
authored
完成 PassiveHttpSink 部分 (apache#17) (apache#18)
* initial * + * = * + * * * + * PassiveHttpSink passed unit tests * 支持 内容头字符,尾字符,事件分隔符。 * format style
1 parent 6b68cc3 commit 1cf7200

File tree

7 files changed

+1041
-1
lines changed

7 files changed

+1041
-1
lines changed
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/**
2+
Licensed to the Apache Software Foundation (ASF) under one or more
3+
contributor license agreements. See the NOTICE file distributed with
4+
this work for additional information regarding copyright ownership.
5+
The ASF licenses this file to You under the Apache License, Version 2.0
6+
(the "License"); you may not use this file except in compliance with
7+
the License. You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
limitations under the License.
17+
*/
18+
19+
package org.apache.flume.sink.gbase;
20+
21+
import org.apache.flume.Channel;
22+
import org.apache.flume.Context;
23+
import org.apache.flume.CounterGroup;
24+
import org.apache.flume.EventDeliveryException;
25+
import org.apache.flume.conf.Configurable;
26+
import org.apache.flume.sink.AbstractSink;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import com.google.common.base.Preconditions;
31+
32+
/**
33+
* A Flume Sink that can publish messages to GBase 8a MPP cluster.
34+
*
35+
* @author He Jiang
36+
*/
37+
public class GBase8aSink extends AbstractSink implements Configurable {
38+
39+
private static final Logger logger = LoggerFactory.getLogger(GBase8aSink.class);
40+
41+
private PassiveHttpSink httpSink;
42+
43+
private CounterGroup counterGroup;
44+
private int batchSize;
45+
46+
public GBase8aSink() {
47+
counterGroup = new CounterGroup();
48+
httpSink = new PassiveHttpSink();
49+
}
50+
51+
@Override
52+
public void configure(Context context) {
53+
httpSink.configure(context);
54+
55+
batchSize = context.getInteger(GBase8aSinkConstants.BATCH_SIZE,
56+
GBase8aSinkConstants.DFLT_BATCH_SIZE);
57+
logger.debug(this.getName() + " " + "batch size set to " + String.valueOf(batchSize));
58+
Preconditions.checkArgument(batchSize > 0, "Batch size must be > 0");
59+
}
60+
61+
@Override
62+
public Status process() throws EventDeliveryException {
63+
Status status = Status.READY;
64+
65+
/*
66+
* TODO 等待合适时间通过 jdbc 通知 8a 来读取数据
67+
*/
68+
69+
return status;
70+
}
71+
72+
@Override
73+
public void start() {
74+
logger.info("Starting {}...", this);
75+
76+
counterGroup.setName(this.getName());
77+
super.start();
78+
httpSink.start();
79+
80+
logger.info("GBase 8a sink {} started.", getName());
81+
}
82+
83+
@Override
84+
public void stop() {
85+
logger.info("GBase 8a sink {} stopping...", getName());
86+
87+
httpSink.stop();
88+
super.stop();
89+
90+
logger.info("GBase 8a sink {} stopped. Event metrics: {}", getName(), counterGroup);
91+
}
92+
93+
@Override
94+
public String toString() {
95+
return "GBase8a " + getName() + " { batchSize: " + batchSize + " }";
96+
}
97+
98+
@Override
99+
public synchronized void setChannel(Channel channel) {
100+
httpSink.setChannel(channel);
101+
super.setChannel(channel);
102+
}
103+
104+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/**
2+
Licensed to the Apache Software Foundation (ASF) under one or more
3+
contributor license agreements. See the NOTICE file distributed with
4+
this work for additional information regarding copyright ownership.
5+
The ASF licenses this file to You under the Apache License, Version 2.0
6+
(the "License"); you may not use this file except in compliance with
7+
the License. You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
limitations under the License.
17+
*/
18+
19+
package org.apache.flume.sink.gbase;
20+
21+
import com.google.common.base.Charsets;
22+
23+
/**
24+
* GBase8aSinkConstants
25+
*
26+
* @author He Jiang
27+
*
28+
*/
29+
public class GBase8aSinkConstants {
30+
31+
public static final String GBASE8A_PREFIX = "gbase8a.";
32+
33+
/* 给 8a 发送 load data 指令的时间间隔 (单位: 秒) */
34+
public static final String INTERVAL = "interval";
35+
36+
/* 8a 连接参数 */
37+
public static final String CONNECTION_STRING = GBASE8A_PREFIX + "jdbcUrl";
38+
public static final String CONNECTION_USERNAME = GBASE8A_PREFIX + "username";
39+
public static final String CONNECTION_PASSWORD = GBASE8A_PREFIX + "password";
40+
41+
/* 8a 加载语句 (直接设置 SQL, 同时支持自动替换 PassiveHttpSink 服务路径 ${URI}) */
42+
public static final String SQL_STRING = GBASE8A_PREFIX + "loadSql";
43+
44+
/* handler 参数 (设置时需要加 "handler." 前缀) */
45+
public static final String BATCH_SIZE = "batchSize";
46+
public static final String CHARACTER_ENCODING = "characterEncoding";
47+
public static final String CONTENT_TYPE = "contentType";
48+
public static final String CONTENT_PREFIX = "contentPrefix";
49+
public static final String CONTENT_SURFFIX = "contentSurffix";
50+
public static final String CONTENT_SEPARATOR = "contentSeparator";
51+
52+
/* 参数默认值 */
53+
public static final int DFLT_INTERVAL = 10000; // ms
54+
public static final String DFLT_HANDLER = "org.apache.flume.sink.gbase.PassiveHttpSinkBlobHandler";
55+
56+
/* handler 参数默认值 */
57+
public static final int DFLT_BATCH_SIZE = 10000; // events
58+
public static final String DFLT_CHARACTER_ENCODING = Charsets.UTF_8.name();
59+
public static final String DFLT_CONTENT_TYPE = "application/json";
60+
public static final String DFLT_CONTENT_PREFIX = null;
61+
public static final String DFLT_CONTENT_SURFFIX = null;
62+
public static final String DFLT_CONTENT_SEPARATOR = null;
63+
}

0 commit comments

Comments
 (0)