Skip to content

[Feature][Connector-V2] Add Apache InLong Connector Support #9250

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from

Conversation

luchunliang
Copy link

@luchunliang luchunliang commented Apr 29, 2025

Description

Background​​
Apache InLong is a one-stop data integration framework that supports efficient streaming/batch data collection, aggregation, and distribution from diverse sources.

Apache InLong primarily focuses on ultra-large-scale data integration but lacks support for diverse data sources. In contrast, SeaTunnel provides extensive data source compatibility. By adding an ​​InLong Sink Connector​​ to SeaTunnel, users can reuse SeaTunnel's rich data sources to feed data into InLong, bridging the gap between versatile data ingestion and large-scale integration capabilities, and gain seamless integration with the InLong ecosystem, enhancing real-time data pipeline capabilities for scenarios like log/metrics synchronization and multi-system data distribution.

​​Proposal​​
Implement an Apache InLong Connector for SeaTunnel with the following features:

  • ​​InLong Sink Connector​​: Write processed data to InLong clusters for downstream consumption or governance.
  • Support core InLong configurations (e.g., groupId, streamId, authentication).

​​Module Structure​​

  • Create a new module: seatunnel-connectors/connector-inlong, referencing existing message queue connectors (e.g., Pulsar).
  • Leverage the InLong Java SDK (e.g., inlong-sdk-java) or direct API calls.

Usage Scenario

Distributing SeaTunnel-processed results through InLong to downstream systems (e.g., real-time dashboards, risk engines).
Suggested Solution

Purpose of this pull request

Add Apache InLong Connector Support

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds support for Apache InLong as a new sink connector in SeaTunnel, enabling users to write processed data into InLong clusters. Key changes include the implementation of core sink writer logic with retry and resend mechanisms, definition of state and commit objects, and integration with InLong’s Java SDK.

  • Added a new module for the InLong sink connector with complete sink writer, committer, and factory implementations.
  • Introduced utility methods and thread management for sending data with retry logic.
  • Provided unit tests and configuration properties for InLong connector functionality.

Reviewed Changes

Copilot reviewed 15 out of 17 changed files in this pull request and generated no comments.

Show a summary per file
File Description
seatunnel-connectors-v2/connector-inlong/src/test/java/org/apache/seatunnel/connectors/seatunnel/inlong/InlongFactoryTest.java Basic unit tests for the InLongSinkFactory.
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/util/InlongUtil.java Utility class for sleep methods with logging.
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/state/*.java New state and commit info classes required for sink state management.
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/SenderMessage.java Defined the message container for batch sending.
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongThreadFactory.java New thread factory implementation for the connector’s internal threads.
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongSinkWriter.java Core sink writer implementation featuring message batching, retry, and resend logic.
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongSinkFactory.java Factory to create the InLong sink and define required options.
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongSinkCommitter.java Minimal committer implementation returning empty commit info.
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongSink.java Main sink implementation integrating writer, state serializer, and committer.
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/exception/*.java New exception and error code classes for error handling in the connector.
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/config/*.java Configuration property definitions and semantics enums for the InLong sink connector.
Files not reviewed (2)
  • plugin-mapping.properties: Language not supported
  • seatunnel-connectors-v2/connector-inlong/pom.xml: Language not supported
Comments suppressed due to low confidence (2)

seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongSinkWriter.java:290

  • The current logic removes an extra element from rowQue when a message exceeds batchSendLen, which may unintentionally drop the subsequent message. Consider removing the redundant rowQue.remove() call so that only the oversized message is dropped.
if (peekMessageLength > batchSendLen) { LOG.warn(...); rowQue.remove(); break; }

seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongSinkWriter.java:247

  • In the retry loop for sending messages, the code does not break out after exceeding MAX_RETRY, which may lead to an infinite loop when sending continually fails. Consider adding a break or alternative error handling to prevent the potential deadlock.
if (retry > MAX_RETRY) { ... }

@Hisoka-X
Copy link
Member

Thanks @luchunliang . Please add e2e test case.

@Hisoka-X
Copy link
Member

Is it necessary to provide a MockServer to complete E2E test cases?

+1

@luchunliang
Copy link
Author

Thanks @luchunliang . Please add e2e test case.

Offline, we have completed end-to-end testing. Here, by referencing connector-activemq and connector-aerospike, we have supplemented the test cases.​

@hailin0
Copy link
Member

hailin0 commented May 28, 2025

@nielifeng nielifeng requested a review from Copilot May 29, 2025 02:19
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Add an Apache InLong Sink connector module to SeaTunnel, including configuration, factory, writer, committer, and registration.

  • Introduce connector-inlong module with core sink implementation and state classes
  • Add configuration options, semantics enum, and error handling for InLong SDK
  • Register connector in plugin mapping and provide a simple factory test

Reviewed Changes

Copilot reviewed 17 out of 17 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
InlongFactoryTest.java Add basic factory identifier tests
util/InlongUtil.java Add sleep helper with exception handling
state/InlongSinkState.java Define empty sink state class
state/InlongCommitInfo.java Define commit info class
state/InlongAggregatedCommitInfo.java Define aggregated commit info class
sink/SenderMessage.java Represent batched messages for sending
sink/InlongThreadFactory.java Create named threads with placeholder tags
sink/InlongSinkWriter.java Implement async sink writer with retry logic
sink/InlongSinkFactory.java Provide table sink factory
sink/InlongSinkCommitter.java Define no-op committer
sink/InlongSink.java Tie writer and committer into SeaTunnel API
exception/InlongException.java Wrap SeaTunnel runtime exceptions
exception/InlongErrorCode.java Enumerate connector-specific error codes
config/SinkProperties.java Define connector configuration options
config/InlongSemantics.java Define delivery semantics enum
connector-inlong/pom.xml Add module dependencies and metadata
plugin-mapping.properties Register seatunnel.sink.Inlong connector
Comments suppressed due to low confidence (5)

seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/config/InlongSemantics.java:26

  • The enum value NON is ambiguous; consider renaming to NONE or NO_SEMANTICS for clarity.
    NON,

seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/config/SinkProperties.java:50

  • Update the description to reference InLong instead of Pulsar to match this connector.
                    .withDescription("Configure which fields are used as the key of the pulsar message.");

seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongThreadFactory.java:33

  • [nitpick] Avoid Hungarian prefix m; rename mThreadNum to threadCounter or similar for consistency.
    private final AtomicInteger mThreadNum = new AtomicInteger(1);

seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongSinkWriter.java:89

  • [nitpick] Field EXECUTOR_SERVICE is mutable but named like a constant; consider lowercase or make it final.
    private static ThreadPoolExecutor EXECUTOR_SERVICE =

seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongSinkWriter.java:225

  • [nitpick] Abbreviated variable suc is unclear; rename to success for readability.
        boolean suc = false;

Comment on lines +23 to +24
* At this semantics, we will directly send the message to pulsar, the data may duplicat/lost if
* job restart/retry or network error.
Copy link
Preview

Copilot AI May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix typo and improve grammar: use 'duplicated or lost' instead of 'duplicat/lost', and consider 'At this semantic'.

Suggested change
* At this semantics, we will directly send the message to pulsar, the data may duplicat/lost if
* job restart/retry or network error.
* In this semantic, we will directly send the message to Pulsar. The data may be duplicated or lost if
* the job restarts/retries or encounters a network error.

Copilot uses AI. Check for mistakes.

}
InlongUtil.silenceSleepInMs(BATCH_SEND_INTERVAL);
}
LOG.info("start send {}:{}", groupId, streamId);
Copy link
Preview

Copilot AI May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The final log incorrectly repeats 'start send'; change it to 'stop send' to reflect shutdown.

Suggested change
LOG.info("start send {}:{}", groupId, streamId);
LOG.info("stop send {}:{}", groupId, streamId);

Copilot uses AI. Check for mistakes.

"message size is {}, greater than max pack size {}, drop it!",
peekMessage.length,
batchSendLen);
rowQue.remove();
Copy link
Preview

Copilot AI May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This extra remove() call can drop an unintended message. Only remove once when discarding oversized entries.

Suggested change
rowQue.remove();

Copilot uses AI. Check for mistakes.

public static final int MAX_RETRY = 5;
public static final int ERROR_LOG_SAMPLE = 10;
public static final int RESEND_QUEUE_WAIT_MS = 10;
private final long WRITE_INTERVAL_MS = 10;
Copy link
Preview

Copilot AI May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Constant timing values should be declared static final to avoid per-instance allocation.

Suggested change
private final long WRITE_INTERVAL_MS = 10;
private static final long WRITE_INTERVAL_MS = 10;

Copilot uses AI. Check for mistakes.

Comment on lines +316 to +321
private final int msgCnt;

SenderCallback(SenderMessage message, int retry) {
this.message = message;
this.retry = retry;
this.msgCnt = message.getDataList().size();
Copy link
Preview

Copilot AI May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Field msgCnt is never used; consider removing it to reduce dead code.

Suggested change
private final int msgCnt;
SenderCallback(SenderMessage message, int retry) {
this.message = message;
this.retry = retry;
this.msgCnt = message.getDataList().size();
// Removed unused field `msgCnt`.
SenderCallback(SenderMessage message, int retry) {
this.message = message;
this.retry = retry;
// Removed initialization of unused field `msgCnt`.

Copilot uses AI. Check for mistakes.

</parent>

<artifactId>connector-inlong</artifactId>
<name>SeaTunnel : Connectors V2 : Pulsar</name>
Copy link
Preview

Copilot AI May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the module <name> to reflect InLong instead of Pulsar.

Copilot uses AI. Check for mistakes.

@hailin0
Copy link
Member

hailin0 commented May 29, 2025

  1. add this connector into plugin_config
    https://github.com/apache/seatunnel/blob/dev/config/plugin_config

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature][Connector-V2] Add Apache InLong Connector Support
3 participants