Skip to content

[Feature] [Jdbc] custom configuration #5875

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class JdbcConnectionConfig implements Serializable {
public String username;
public String password;
public String query;
public String beforeQuery;

public boolean autoCommit = JdbcOptions.AUTO_COMMIT.defaultValue();

Expand Down Expand Up @@ -66,6 +67,7 @@ public static JdbcConnectionConfig of(ReadonlyConfig config) {
builder.maxRetries(config.get(JdbcOptions.MAX_RETRIES));
builder.connectionCheckTimeoutSeconds(config.get(JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC));
builder.batchSize(config.get(JdbcOptions.BATCH_SIZE));
builder.beforeQuery(config.get(JdbcOptions.BEFORE_QUERY));
if (config.get(JdbcOptions.IS_EXACTLY_ONCE)) {
builder.xaDataSourceClassName(config.get(JdbcOptions.XA_DATA_SOURCE_CLASS_NAME));
builder.maxCommitAttempts(config.get(JdbcOptions.MAX_COMMIT_ATTEMPTS));
Expand Down Expand Up @@ -128,6 +130,10 @@ public int getMaxCommitAttempts() {
return maxCommitAttempts;
}

public String getBeforeQuery() {
return beforeQuery;
}

public Optional<Integer> getTransactionTimeoutSec() {
return transactionTimeoutSec < 0 ? Optional.empty() : Optional.of(transactionTimeoutSec);
}
Expand All @@ -150,6 +156,7 @@ public static final class Builder {
private String username;
private String password;
private String query;
private String beforeQuery;
private boolean autoCommit = JdbcOptions.AUTO_COMMIT.defaultValue();
private int batchSize = JdbcOptions.BATCH_SIZE.defaultValue();
private String xaDataSourceClassName;
Expand Down Expand Up @@ -203,6 +210,11 @@ public Builder query(String query) {
return this;
}

public Builder beforeQuery(String beforeQuery) {
this.beforeQuery = beforeQuery;
return this;
}

public Builder autoCommit(boolean autoCommit) {
this.autoCommit = autoCommit;
return this;
Expand Down Expand Up @@ -273,6 +285,7 @@ public JdbcConnectionConfig build() {
jdbcConnectionConfig.krb5Path = this.krb5Path;
jdbcConnectionConfig.properties =
this.properties == null ? new HashMap<>() : this.properties;
jdbcConnectionConfig.beforeQuery = this.beforeQuery;
return jdbcConnectionConfig;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public interface JdbcOptions {
Option<String> QUERY =
Options.key("query").stringType().noDefaultValue().withDescription("query");

Option<String> BEFORE_QUERY =
Options.key("before_query").stringType().noDefaultValue().withDescription("before_query");

Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
Options.key("schema_save_mode")
.enumType(SchemaSaveMode.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal;

import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
Expand Down Expand Up @@ -56,6 +57,7 @@ public class JdbcInputFormat implements Serializable {
private final JdbcRowConverter jdbcRowConverter;
private final Map<TablePath, SeaTunnelRowType> tables;
private final ChunkSplitter chunkSplitter;
private final String beforeQuery;

private transient String splitTableId;
private transient SeaTunnelRowType splitRowType;
Expand All @@ -70,9 +72,18 @@ public JdbcInputFormat(JdbcSourceConfig config, Map<TablePath, SeaTunnelRowType>
this.chunkSplitter = ChunkSplitter.create(config);
this.jdbcRowConverter = jdbcDialect.getRowConverter();
this.tables = tables;
this.beforeQuery = config.getJdbcConnectionConfig().getBeforeQuery();
}

public void openInputFormat() {}
public void openInputFormat() throws SQLException {
// Set specific configuration
if (StringUtils.isNoneBlank(beforeQuery)) {
String[] customConfig = beforeQuery.split(";");
for (String config : customConfig) {
statement.execute(config);
}
}
}

public void closeInputFormat() throws IOException {
close();
Expand All @@ -86,7 +97,7 @@ public void closeInputFormat() throws IOException {
* Connects to the source database and executes the query
*
* @param inputSplit which is ignored if this InputFormat is executed as a non-parallel source,
* a "hook" to the query parameters otherwise (using its <i>parameterId</i>)
* a "hook" to the query parameters otherwise (using its <i>parameterId</i>)
* @throws IOException if there's an error during the execution of the query
*/
public void open(JdbcSourceSplit inputSplit) throws IOException {
Expand Down Expand Up @@ -136,7 +147,9 @@ public boolean reachedEnd() {
return !hasNext;
}

/** Convert a row of data to seatunnelRow */
/**
* Convert a row of data to seatunnelRow
*/
public SeaTunnelRow nextRecord() {
try {
if (!hasNext) {
Expand Down