-
Notifications
You must be signed in to change notification settings - Fork 2k
[Feature][Connector-V2] Add Apache InLong Connector Support #9250
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds support for Apache InLong as a new sink connector in SeaTunnel, enabling users to write processed data into InLong clusters. Key changes include the implementation of core sink writer logic with retry and resend mechanisms, definition of state and commit objects, and integration with InLong’s Java SDK.
- Added a new module for the InLong sink connector with complete sink writer, committer, and factory implementations.
- Introduced utility methods and thread management for sending data with retry logic.
- Provided unit tests and configuration properties for InLong connector functionality.
Reviewed Changes
Copilot reviewed 15 out of 17 changed files in this pull request and generated no comments.
Show a summary per file
File | Description |
---|---|
seatunnel-connectors-v2/connector-inlong/src/test/java/org/apache/seatunnel/connectors/seatunnel/inlong/InlongFactoryTest.java | Basic unit tests for the InLongSinkFactory. |
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/util/InlongUtil.java | Utility class for sleep methods with logging. |
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/state/*.java | New state and commit info classes required for sink state management. |
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/SenderMessage.java | Defined the message container for batch sending. |
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongThreadFactory.java | New thread factory implementation for the connector’s internal threads. |
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongSinkWriter.java | Core sink writer implementation featuring message batching, retry, and resend logic. |
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongSinkFactory.java | Factory to create the InLong sink and define required options. |
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongSinkCommitter.java | Minimal committer implementation returning empty commit info. |
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongSink.java | Main sink implementation integrating writer, state serializer, and committer. |
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/exception/*.java | New exception and error code classes for error handling in the connector. |
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/config/*.java | Configuration property definitions and semantics enums for the InLong sink connector. |
Files not reviewed (2)
- plugin-mapping.properties: Language not supported
- seatunnel-connectors-v2/connector-inlong/pom.xml: Language not supported
Comments suppressed due to low confidence (2)
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongSinkWriter.java:290
- The current logic removes an extra element from rowQue when a message exceeds batchSendLen, which may unintentionally drop the subsequent message. Consider removing the redundant rowQue.remove() call so that only the oversized message is dropped.
if (peekMessageLength > batchSendLen) { LOG.warn(...); rowQue.remove(); break; }
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongSinkWriter.java:247
- In the retry loop for sending messages, the code does not break out after exceeding MAX_RETRY, which may lead to an infinite loop when sending continually fails. Consider adding a break or alternative error handling to prevent the potential deadlock.
if (retry > MAX_RETRY) { ... }
Thanks @luchunliang . Please add e2e test case. |
+1 |
ab421d9
to
ee50fc1
Compare
Offline, we have completed end-to-end testing. Here, by referencing connector-activemq and connector-aerospike, we have supplemented the test cases. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Add an Apache InLong Sink connector module to SeaTunnel, including configuration, factory, writer, committer, and registration.
- Introduce connector-inlong module with core sink implementation and state classes
- Add configuration options, semantics enum, and error handling for InLong SDK
- Register connector in plugin mapping and provide a simple factory test
Reviewed Changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 6 comments.
Show a summary per file
File | Description |
---|---|
InlongFactoryTest.java | Add basic factory identifier tests |
util/InlongUtil.java | Add sleep helper with exception handling |
state/InlongSinkState.java | Define empty sink state class |
state/InlongCommitInfo.java | Define commit info class |
state/InlongAggregatedCommitInfo.java | Define aggregated commit info class |
sink/SenderMessage.java | Represent batched messages for sending |
sink/InlongThreadFactory.java | Create named threads with placeholder tags |
sink/InlongSinkWriter.java | Implement async sink writer with retry logic |
sink/InlongSinkFactory.java | Provide table sink factory |
sink/InlongSinkCommitter.java | Define no-op committer |
sink/InlongSink.java | Tie writer and committer into SeaTunnel API |
exception/InlongException.java | Wrap SeaTunnel runtime exceptions |
exception/InlongErrorCode.java | Enumerate connector-specific error codes |
config/SinkProperties.java | Define connector configuration options |
config/InlongSemantics.java | Define delivery semantics enum |
connector-inlong/pom.xml | Add module dependencies and metadata |
plugin-mapping.properties | Register seatunnel.sink.Inlong connector |
Comments suppressed due to low confidence (5)
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/config/InlongSemantics.java:26
- The enum value
NON
is ambiguous; consider renaming toNONE
orNO_SEMANTICS
for clarity.
NON,
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/config/SinkProperties.java:50
- Update the description to reference InLong instead of Pulsar to match this connector.
.withDescription("Configure which fields are used as the key of the pulsar message.");
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongThreadFactory.java:33
- [nitpick] Avoid Hungarian prefix
m
; renamemThreadNum
tothreadCounter
or similar for consistency.
private final AtomicInteger mThreadNum = new AtomicInteger(1);
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongSinkWriter.java:89
- [nitpick] Field
EXECUTOR_SERVICE
is mutable but named like a constant; consider lowercase or make itfinal
.
private static ThreadPoolExecutor EXECUTOR_SERVICE =
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongSinkWriter.java:225
- [nitpick] Abbreviated variable
suc
is unclear; rename tosuccess
for readability.
boolean suc = false;
* At this semantics, we will directly send the message to pulsar, the data may duplicat/lost if | ||
* job restart/retry or network error. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix typo and improve grammar: use 'duplicated or lost' instead of 'duplicat/lost', and consider 'At this semantic'.
* At this semantics, we will directly send the message to pulsar, the data may duplicat/lost if | |
* job restart/retry or network error. | |
* In this semantic, we will directly send the message to Pulsar. The data may be duplicated or lost if | |
* the job restarts/retries or encounters a network error. |
Copilot uses AI. Check for mistakes.
} | ||
InlongUtil.silenceSleepInMs(BATCH_SEND_INTERVAL); | ||
} | ||
LOG.info("start send {}:{}", groupId, streamId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The final log incorrectly repeats 'start send'; change it to 'stop send' to reflect shutdown.
LOG.info("start send {}:{}", groupId, streamId); | |
LOG.info("stop send {}:{}", groupId, streamId); |
Copilot uses AI. Check for mistakes.
"message size is {}, greater than max pack size {}, drop it!", | ||
peekMessage.length, | ||
batchSendLen); | ||
rowQue.remove(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This extra remove()
call can drop an unintended message. Only remove once when discarding oversized entries.
rowQue.remove(); |
Copilot uses AI. Check for mistakes.
public static final int MAX_RETRY = 5; | ||
public static final int ERROR_LOG_SAMPLE = 10; | ||
public static final int RESEND_QUEUE_WAIT_MS = 10; | ||
private final long WRITE_INTERVAL_MS = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Constant timing values should be declared static final
to avoid per-instance allocation.
private final long WRITE_INTERVAL_MS = 10; | |
private static final long WRITE_INTERVAL_MS = 10; |
Copilot uses AI. Check for mistakes.
private final int msgCnt; | ||
|
||
SenderCallback(SenderMessage message, int retry) { | ||
this.message = message; | ||
this.retry = retry; | ||
this.msgCnt = message.getDataList().size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Field msgCnt
is never used; consider removing it to reduce dead code.
private final int msgCnt; | |
SenderCallback(SenderMessage message, int retry) { | |
this.message = message; | |
this.retry = retry; | |
this.msgCnt = message.getDataList().size(); | |
// Removed unused field `msgCnt`. | |
SenderCallback(SenderMessage message, int retry) { | |
this.message = message; | |
this.retry = retry; | |
// Removed initialization of unused field `msgCnt`. |
Copilot uses AI. Check for mistakes.
</parent> | ||
|
||
<artifactId>connector-inlong</artifactId> | ||
<name>SeaTunnel : Connectors V2 : Pulsar</name> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update the module <name>
to reflect InLong instead of Pulsar.
Copilot uses AI. Check for mistakes.
|
Description
Background
Apache InLong is a one-stop data integration framework that supports efficient streaming/batch data collection, aggregation, and distribution from diverse sources.
Apache InLong primarily focuses on ultra-large-scale data integration but lacks support for diverse data sources. In contrast, SeaTunnel provides extensive data source compatibility. By adding an InLong Sink Connector to SeaTunnel, users can reuse SeaTunnel's rich data sources to feed data into InLong, bridging the gap between versatile data ingestion and large-scale integration capabilities, and gain seamless integration with the InLong ecosystem, enhancing real-time data pipeline capabilities for scenarios like log/metrics synchronization and multi-system data distribution.
Proposal
Implement an Apache InLong Connector for SeaTunnel with the following features:
Module Structure
Usage Scenario
Distributing SeaTunnel-processed results through InLong to downstream systems (e.g., real-time dashboards, risk engines).
Suggested Solution
Purpose of this pull request
Add Apache InLong Connector Support
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide
release-note
.