Skip to content

[Improve][Connector-V2][StarRocks]StarRocks Sink connector support 2pc and eos #4752

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion docs/en/connector-v2/sink/StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,17 @@
Used to send data to StarRocks. Both support streaming and batch mode.
The internal implementation of StarRocks sink connector is cached and imported by stream load in batches.

:::tip

Version Supported

* exactly-once supported `StarRocks version is >= 2.4.x`

:::

## Key features

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
- [x] [cdc](../../concept/connector-v2-features.md)

## Options
Expand All @@ -30,6 +38,8 @@ The internal implementation of StarRocks sink connector is cached and imported b
| retry_backoff_multiplier_ms | int | no | - |
| max_retry_backoff_ms | int | no | - |
| enable_upsert_delete | boolean | no | false |
| enable-2pc | boolean | no | false |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| enable-2pc | boolean | no | false |
| exactly-once | boolean | no | false |

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your suggestion, fix it

| flush_frequency_ms | long | no | 50 |
| save_mode_create_template | string | no | see below |
| starrocks.config | map | no | - |

Expand Down Expand Up @@ -89,6 +99,14 @@ The amount of time to wait before attempting to retry a request to `StarRocks`

Whether to enable upsert/delete, only supports PrimaryKey model.

### enable-2pc [bool]

Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to [here](https://docs.starrocks.io/zh-cn/latest/loading/Stream_Load_transaction_interface).

### flush_frequency_ms [long]

trigger flush frequency for batch writing ON EOS semantics

### save_mode_create_template [string]

We use templates to automatically create starrocks tables,
Expand Down Expand Up @@ -199,6 +217,22 @@ sink {
}
```

Support load data ON EOS semantics

```hocon
sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
username = root
password = ""
database = "test"
table = "e2e_table_sink"
enable-2pc = true
flush_frequency_ms= 30000
}
}
```

## Changelog

### next version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@

package org.apache.seatunnel.connectors.seatunnel.starrocks.client;

import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;

import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
Expand All @@ -38,8 +36,10 @@
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import static org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode.FLUSH_DATA_FAILED;

@Slf4j
public class HttpHelper {
Expand Down Expand Up @@ -77,100 +77,58 @@ public String doHttpPost(String postUrl, Map<String, String> header, String post
}
}

public String doHttpGet(String getUrl) throws IOException {
log.info("Executing GET from {}.", getUrl);
try (CloseableHttpClient httpclient = buildHttpClient()) {
HttpGet httpGet = new HttpGet(getUrl);
try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
HttpEntity respEntity = resp.getEntity();
if (null == respEntity) {
log.warn("Request failed with empty response.");
return null;
}
return EntityUtils.toString(respEntity);
public String doHttpExecute(HttpClientBuilder clientBuilder, HttpRequestBase httpRequestBase)
throws IOException {
if (Objects.isNull(clientBuilder)) clientBuilder = getDefaultClientBuilder();
try (CloseableHttpClient client = clientBuilder.build()) {
try (CloseableHttpResponse response = client.execute(httpRequestBase)) {
return parseHttpResponse(response, httpRequestBase.getMethod());
}
}
}

public Map<String, Object> doHttpGet(String getUrl, Map<String, String> header)
throws IOException {
log.info("Executing GET from {}.", getUrl);
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
HttpGet httpGet = new HttpGet(getUrl);
if (null != header) {
for (Map.Entry<String, String> entry : header.entrySet()) {
httpGet.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
}
}
try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
HttpEntity respEntity = getHttpEntity(resp);
if (null == respEntity) {
log.warn("Request failed with empty response.");
return null;
}
return JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class);
}
public String parseHttpResponse(CloseableHttpResponse response, String requestType)
throws StarRocksConnectorException {
int code = response.getStatusLine().getStatusCode();
if (307 == code) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change the response code number to same meaning enum?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change the response code number to same meaning enum?
Thanks, fixed it

String errorMsg =
String.format(
"Request %s failed because http response code is 307 which means 'Temporary Redirect'. "
+ "This can happen when FE responds the request slowly , you should find the reason first. The reason may be "
+ "StarRocks FE/ENGINE GC, network delay, or others. response status line: %s",
requestType, response.getStatusLine());
log.error("{}", errorMsg);
throw new StarRocksConnectorException(FLUSH_DATA_FAILED, errorMsg);
} else if (200 != code) {
String errorMsg =
String.format(
"Request %s failed because http response code is not 200. response status line: %s",
requestType, response.getStatusLine());
log.error("{}", errorMsg);
throw new StarRocksConnectorException(FLUSH_DATA_FAILED, errorMsg);
}
}

@SuppressWarnings("unchecked")
public Map<String, Object> doHttpPut(String url, byte[] data, Map<String, String> header)
throws IOException {
final HttpClientBuilder httpClientBuilder =
HttpClients.custom()
.setRedirectStrategy(
new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
try (CloseableHttpClient httpclient = httpClientBuilder.build()) {
HttpPut httpPut = new HttpPut(url);
if (null != header) {
for (Map.Entry<String, String> entry : header.entrySet()) {
httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
}
}
httpPut.setEntity(new ByteArrayEntity(data));
httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {
int code = resp.getStatusLine().getStatusCode();
if (HttpStatus.SC_OK != code) {
String errorText;
try {
HttpEntity respEntity = resp.getEntity();
errorText = EntityUtils.toString(respEntity);
} catch (Exception err) {
errorText = "find errorText failed: " + err.getMessage();
}
log.warn("Request failed with code:{}, err:{}", code, errorText);
Map<String, Object> errorMap = new HashMap<>();
errorMap.put("Status", "Fail");
errorMap.put("Message", errorText);
return errorMap;
}
HttpEntity respEntity = resp.getEntity();
if (null == respEntity) {
log.warn("Request failed with empty response.");
return null;
}
return JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class);
}
HttpEntity respEntity = response.getEntity();
if (respEntity == null) {
String errorMsg =
String.format(
"Request %s failed because response entity is null. response status line: %s",
requestType, response.getStatusLine());
log.error("{}", errorMsg);
throw new StarRocksConnectorException(FLUSH_DATA_FAILED, errorMsg);
}
}

private CloseableHttpClient buildHttpClient() {
final HttpClientBuilder httpClientBuilder =
HttpClients.custom()
.setRedirectStrategy(
new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
return httpClientBuilder.build();
try {
return EntityUtils.toString(respEntity);
} catch (Exception e) {
String errorMsg =
String.format(
"Request %s failed because fail to convert response entity to string. "
+ "response status line: %s, response entity: %s",
requestType, response.getStatusLine(), response.getEntity());
log.error("{}", errorMsg, e);
throw new StarRocksConnectorException(FLUSH_DATA_FAILED, errorMsg, e);
}
}

public boolean tryHttpConnection(String host) {
Expand All @@ -181,9 +139,20 @@ public boolean tryHttpConnection(String host) {
co.connect();
co.disconnect();
return true;
} catch (Exception e1) {
log.warn("Failed to connect to address:{}", host, e1);
} catch (Exception e) {
log.warn("Failed to connect to address:{}", host, e);
return false;
}
}

private HttpClientBuilder getDefaultClientBuilder() {
return HttpClients.custom()
.setRedirectStrategy(
new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
}
}
Loading