-
Notifications
You must be signed in to change notification settings - Fork 2k
[Improve][Connector-V2][StarRocks]StarRocks Sink connector support 2pc and eos #4752
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
531651225
wants to merge
21
commits into
apache:dev
Choose a base branch
from
531651225:starrocks_sink_connector_eos
base: dev
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 18 commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
da368c7
improve sink to eos
68948ad
Merge branch 'dev' into starrocks_sink_connector_eos
89f0717
fix error on get starrocks source typeInfo
3778ee1
fix error on get starrocks source typeInfo
c760538
Merge branch 'dev' of https://github.com/531651225/incubator-seatunne…
2023f18
fix-SinkWriter-checkpointid-init problem
32ba120
fix-SinkWriter-checkpointid-init problem
9946b3b
merge dev
5f967e6
Merge branch 'dev' of https://github.com/531651225/incubator-seatunne…
d6230cc
Merge branch 'dev' into starrocks_sink_connector_eos
b7e035d
merge dev
5e45652
merge dev
6b5e930
Merge branch 'dev' of https://github.com/531651225/incubator-seatunne…
44ba6ec
Merge branch 'dev' into starrocks_sink_connector_eos
40f8f70
fix code style
9bc1a1f
merge dev
781b2eb
merge dev
9ed2c71
fix code style
92ef83e
fix code style
6a10b41
fix code problem
a06bfb3
fix doc problem
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,15 +17,13 @@ | |
|
||
package org.apache.seatunnel.connectors.seatunnel.starrocks.client; | ||
|
||
import org.apache.seatunnel.common.utils.JsonUtils; | ||
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException; | ||
|
||
import org.apache.http.HttpEntity; | ||
import org.apache.http.HttpStatus; | ||
import org.apache.http.client.config.RequestConfig; | ||
import org.apache.http.client.methods.CloseableHttpResponse; | ||
import org.apache.http.client.methods.HttpGet; | ||
import org.apache.http.client.methods.HttpPost; | ||
import org.apache.http.client.methods.HttpPut; | ||
import org.apache.http.client.methods.HttpRequestBase; | ||
import org.apache.http.entity.ByteArrayEntity; | ||
import org.apache.http.impl.client.CloseableHttpClient; | ||
import org.apache.http.impl.client.DefaultRedirectStrategy; | ||
|
@@ -38,8 +36,10 @@ | |
import java.io.IOException; | ||
import java.net.HttpURLConnection; | ||
import java.net.URL; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
import static org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode.FLUSH_DATA_FAILED; | ||
|
||
@Slf4j | ||
public class HttpHelper { | ||
|
@@ -77,100 +77,58 @@ public String doHttpPost(String postUrl, Map<String, String> header, String post | |
} | ||
} | ||
|
||
public String doHttpGet(String getUrl) throws IOException { | ||
log.info("Executing GET from {}.", getUrl); | ||
try (CloseableHttpClient httpclient = buildHttpClient()) { | ||
HttpGet httpGet = new HttpGet(getUrl); | ||
try (CloseableHttpResponse resp = httpclient.execute(httpGet)) { | ||
HttpEntity respEntity = resp.getEntity(); | ||
if (null == respEntity) { | ||
log.warn("Request failed with empty response."); | ||
return null; | ||
} | ||
return EntityUtils.toString(respEntity); | ||
public String doHttpExecute(HttpClientBuilder clientBuilder, HttpRequestBase httpRequestBase) | ||
throws IOException { | ||
if (Objects.isNull(clientBuilder)) clientBuilder = getDefaultClientBuilder(); | ||
try (CloseableHttpClient client = clientBuilder.build()) { | ||
try (CloseableHttpResponse response = client.execute(httpRequestBase)) { | ||
return parseHttpResponse(response, httpRequestBase.getMethod()); | ||
} | ||
} | ||
} | ||
|
||
public Map<String, Object> doHttpGet(String getUrl, Map<String, String> header) | ||
throws IOException { | ||
log.info("Executing GET from {}.", getUrl); | ||
try (CloseableHttpClient httpclient = HttpClients.createDefault()) { | ||
HttpGet httpGet = new HttpGet(getUrl); | ||
if (null != header) { | ||
for (Map.Entry<String, String> entry : header.entrySet()) { | ||
httpGet.setHeader(entry.getKey(), String.valueOf(entry.getValue())); | ||
} | ||
} | ||
try (CloseableHttpResponse resp = httpclient.execute(httpGet)) { | ||
HttpEntity respEntity = getHttpEntity(resp); | ||
if (null == respEntity) { | ||
log.warn("Request failed with empty response."); | ||
return null; | ||
} | ||
return JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class); | ||
} | ||
public String parseHttpResponse(CloseableHttpResponse response, String requestType) | ||
throws StarRocksConnectorException { | ||
int code = response.getStatusLine().getStatusCode(); | ||
if (307 == code) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you change the response code number to same meaning enum? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
String errorMsg = | ||
String.format( | ||
"Request %s failed because http response code is 307 which means 'Temporary Redirect'. " | ||
+ "This can happen when FE responds the request slowly , you should find the reason first. The reason may be " | ||
+ "StarRocks FE/ENGINE GC, network delay, or others. response status line: %s", | ||
requestType, response.getStatusLine()); | ||
log.error("{}", errorMsg); | ||
throw new StarRocksConnectorException(FLUSH_DATA_FAILED, errorMsg); | ||
} else if (200 != code) { | ||
String errorMsg = | ||
String.format( | ||
"Request %s failed because http response code is not 200. response status line: %s", | ||
requestType, response.getStatusLine()); | ||
log.error("{}", errorMsg); | ||
throw new StarRocksConnectorException(FLUSH_DATA_FAILED, errorMsg); | ||
531651225 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
public Map<String, Object> doHttpPut(String url, byte[] data, Map<String, String> header) | ||
throws IOException { | ||
final HttpClientBuilder httpClientBuilder = | ||
HttpClients.custom() | ||
.setRedirectStrategy( | ||
new DefaultRedirectStrategy() { | ||
@Override | ||
protected boolean isRedirectable(String method) { | ||
return true; | ||
} | ||
}); | ||
try (CloseableHttpClient httpclient = httpClientBuilder.build()) { | ||
HttpPut httpPut = new HttpPut(url); | ||
if (null != header) { | ||
for (Map.Entry<String, String> entry : header.entrySet()) { | ||
httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue())); | ||
} | ||
} | ||
httpPut.setEntity(new ByteArrayEntity(data)); | ||
httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build()); | ||
try (CloseableHttpResponse resp = httpclient.execute(httpPut)) { | ||
int code = resp.getStatusLine().getStatusCode(); | ||
if (HttpStatus.SC_OK != code) { | ||
String errorText; | ||
try { | ||
HttpEntity respEntity = resp.getEntity(); | ||
errorText = EntityUtils.toString(respEntity); | ||
} catch (Exception err) { | ||
errorText = "find errorText failed: " + err.getMessage(); | ||
} | ||
log.warn("Request failed with code:{}, err:{}", code, errorText); | ||
Map<String, Object> errorMap = new HashMap<>(); | ||
errorMap.put("Status", "Fail"); | ||
errorMap.put("Message", errorText); | ||
return errorMap; | ||
} | ||
HttpEntity respEntity = resp.getEntity(); | ||
if (null == respEntity) { | ||
log.warn("Request failed with empty response."); | ||
return null; | ||
} | ||
return JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class); | ||
} | ||
HttpEntity respEntity = response.getEntity(); | ||
if (respEntity == null) { | ||
String errorMsg = | ||
String.format( | ||
"Request %s failed because response entity is null. response status line: %s", | ||
requestType, response.getStatusLine()); | ||
log.error("{}", errorMsg); | ||
throw new StarRocksConnectorException(FLUSH_DATA_FAILED, errorMsg); | ||
531651225 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
private CloseableHttpClient buildHttpClient() { | ||
final HttpClientBuilder httpClientBuilder = | ||
HttpClients.custom() | ||
.setRedirectStrategy( | ||
new DefaultRedirectStrategy() { | ||
@Override | ||
protected boolean isRedirectable(String method) { | ||
return true; | ||
} | ||
}); | ||
return httpClientBuilder.build(); | ||
try { | ||
return EntityUtils.toString(respEntity); | ||
} catch (Exception e) { | ||
String errorMsg = | ||
String.format( | ||
"Request %s failed because fail to convert response entity to string. " | ||
+ "response status line: %s, response entity: %s", | ||
requestType, response.getStatusLine(), response.getEntity()); | ||
log.error("{}", errorMsg, e); | ||
throw new StarRocksConnectorException(FLUSH_DATA_FAILED, errorMsg, e); | ||
} | ||
} | ||
|
||
public boolean tryHttpConnection(String host) { | ||
|
@@ -181,9 +139,20 @@ public boolean tryHttpConnection(String host) { | |
co.connect(); | ||
co.disconnect(); | ||
return true; | ||
} catch (Exception e1) { | ||
log.warn("Failed to connect to address:{}", host, e1); | ||
} catch (Exception e) { | ||
log.warn("Failed to connect to address:{}", host, e); | ||
return false; | ||
} | ||
} | ||
|
||
private HttpClientBuilder getDefaultClientBuilder() { | ||
return HttpClients.custom() | ||
.setRedirectStrategy( | ||
new DefaultRedirectStrategy() { | ||
@Override | ||
protected boolean isRedirectable(String method) { | ||
return true; | ||
} | ||
}); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your suggestion, fix it