Skip to content

[Catalog][DM][Improve] dm catalog and dm auto create table and auto create table add constraintKey #5572

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
5db48e0
[imporve] dm use schema and table
XiaoJiang521 Sep 20, 2023
7cf5b1d
[imporve] create table add constraintKey and dm catalog
XiaoJiang521 Sep 27, 2023
da997b7
Merge branch 'dev' into dev_dm_udpdate_gettablename
XiaoJiang521 Oct 13, 2023
81a43ff
[imporve] Key update union Key
XiaoJiang521 Oct 13, 2023
ca12c4d
Merge branch 'dev' into dev_dm_udpdate_gettablename
XiaoJiang521 Oct 13, 2023
e377735
[e2e] Add dm catalog e2e
XiaoJiang521 Oct 13, 2023
1b25e64
[e2e] Add dm catalog e2e
XiaoJiang521 Oct 13, 2023
636e3b7
[e2e] Add dm catalog e2e
XiaoJiang521 Oct 13, 2023
39b30bc
[e2e] remove ""
XiaoJiang521 Oct 13, 2023
9c1668f
[e2e] add (
XiaoJiang521 Oct 13, 2023
89220e5
[e2e]
XiaoJiang521 Oct 13, 2023
12ffa74
[e2e] update dameng catalog
XiaoJiang521 Oct 14, 2023
88334c6
[e2e] add dm driver
XiaoJiang521 Oct 16, 2023
0b938a3
[e2e] e2e
XiaoJiang521 Oct 16, 2023
23dec25
[e2e] info
XiaoJiang521 Oct 16, 2023
cc28215
[e2e] update conf
XiaoJiang521 Oct 16, 2023
9ebdf58
Merge branch 'dev' into dev_dm_udpdate_gettablename
XiaoJiang521 Oct 16, 2023
3561491
[e2e] update
XiaoJiang521 Oct 16, 2023
15f4111
[e2e] update
XiaoJiang521 Oct 16, 2023
4ae253d
[e2e] order
XiaoJiang521 Oct 16, 2023
2de8d35
[e2e] create testuser
XiaoJiang521 Oct 16, 2023
547e6e0
[e2e] create testuser
XiaoJiang521 Oct 16, 2023
09b0dfd
[e2e] create testuser
XiaoJiang521 Oct 17, 2023
be4b909
[e2e] implent
XiaoJiang521 Oct 17, 2023
adadc66
[e2e] implent
XiaoJiang521 Oct 17, 2023
1d64fbc
[e2e] implent
XiaoJiang521 Oct 17, 2023
cd59074
[e2e] update TESTUSER
XiaoJiang521 Oct 17, 2023
6ce5165
[e2e] merge dev
XiaoJiang521 Oct 17, 2023
03346e0
[e2e] add schema
XiaoJiang521 Oct 17, 2023
737eedd
[e2e] update database
XiaoJiang521 Oct 17, 2023
96fa738
[e2e] update get table sql
XiaoJiang521 Oct 17, 2023
18cfacf
[e2e] update get table sql
XiaoJiang521 Oct 17, 2023
e603c3c
[e2e] remove default schema
XiaoJiang521 Oct 18, 2023
e15e07f
[e2e] update sql
XiaoJiang521 Oct 18, 2023
1b45439
[e2e] merge dev
XiaoJiang521 Oct 25, 2023
ea1aaf7
Merge branch 'dev' into dev_dm_udpdate_gettablename
XiaoJiang521 Oct 26, 2023
1d8aa87
[e2e] import package
XiaoJiang521 Oct 26, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -55,7 +55,10 @@ public abstract class Column implements Serializable {

protected final String comment;

/** Field type in the database * */
/**
* Field type in the database For example : varchar is varchar(50),DECIMAL is DECIMAL(20,5) ,
* int is int Each database can customize the sourceType according to its own characteristics*
*/
protected final String sourceType;

/** Unsigned bit * */
@@ -64,7 +67,13 @@ public abstract class Column implements Serializable {
/** Whether to use the 0 bit * */
protected final boolean isZeroFill;

/** Bit length * */
/**
* Bit length For different database byte types, it is possible to define the number of bits is
* not the same, so the byte types parameter is converted to bits according to the database at
* the time of construction, and then the bitLen is converted to the child end when the
* automatic table is builtinteger may be cross the border For example, Mysql has 8 bits of
* bytes: bitLen = bytesLen << 3
*/
protected final Long bitLen;

/** integer may be cross the border * */
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;

import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;

import java.util.List;
import java.util.stream.Collectors;

// todo Abstract the concatenation common content of jdbc auto-build statements
public abstract class AbstractJdbcCreateTableSqlBuilder {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to raise a pr for modification about AbstractJdbcCreateTableSqlBuilder and related classes in existing code firstly, and make the improvement on dm catalog in another pr.


protected boolean primaryCompareToConstrainKey(
PrimaryKey primaryKey, ConstraintKey constraintKey) {
List<String> columnNames = primaryKey.getColumnNames();
List<ConstraintKey.ConstraintKeyColumn> constraintKeyColumnNames =
constraintKey.getColumnNames();
return columnNames.stream()
.map(Object::toString)
.collect(Collectors.toList())
.containsAll(
constraintKeyColumnNames.stream()
.map(ConstraintKey.ConstraintKeyColumn::getColumnName)
.collect(Collectors.toList()));
}
}
Original file line number Diff line number Diff line change
@@ -53,14 +53,19 @@ public class DamengCatalog extends AbstractJdbcCatalog {

private static final String SELECT_COLUMNS_SQL =
"SELECT COLUMNS.COLUMN_NAME, COLUMNS.DATA_TYPE, COLUMNS.DATA_LENGTH, COLUMNS.DATA_PRECISION, COLUMNS.DATA_SCALE "
+ ", COLUMNS.NULLABLE, COLUMNS.DATA_DEFAULT, COMMENTS.COMMENTS "
+ ", COLUMNS.NULLABLE, COLUMNS.DATA_DEFAULT, COMMENTS.COMMENTS ,"
+ "CASE \n"
+ " WHEN COLUMNS.DATA_TYPE IN ('CHAR', 'CHARACTER', 'VARCHAR', 'VARCHAR2', 'VARBINARY', 'BINARY') THEN COLUMNS.DATA_TYPE || '(' || COLUMNS.DATA_LENGTH || ')'\n"
+ " WHEN COLUMNS.DATA_TYPE IN ('NUMERIC', 'DECIMAL', 'NUMBER') AND COLUMNS.DATA_PRECISION IS NOT NULL AND COLUMNS.DATA_SCALE IS NOT NULL AND COLUMNS.DATA_PRECISION != 0 AND COLUMNS.DATA_SCALE != 0 THEN COLUMNS.DATA_TYPE || '(' || COLUMNS.DATA_PRECISION || ', ' || COLUMNS.DATA_SCALE || ')'\n"
+ " ELSE COLUMNS.DATA_TYPE\n"
+ " END AS SOURCE_TYPE \n"
+ "FROM ALL_TAB_COLUMNS COLUMNS "
+ "LEFT JOIN ALL_COL_COMMENTS COMMENTS "
+ "ON COLUMNS.OWNER = COMMENTS.SCHEMA_NAME "
+ "AND COLUMNS.TABLE_NAME = COMMENTS.TABLE_NAME "
+ "AND COLUMNS.COLUMN_NAME = COMMENTS.COLUMN_NAME "
+ "WHERE COLUMNS.OWNER = ? "
+ "AND COLUMNS.TABLE_NAME = ? "
+ "WHERE COLUMNS.OWNER = '%s' "
+ "AND COLUMNS.TABLE_NAME = '%s' "
+ "ORDER BY COLUMNS.COLUMN_ID ASC";

public DamengCatalog(
@@ -79,17 +84,17 @@ protected String getListDatabaseSql() {

@Override
protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {
throw new UnsupportedOperationException();
return new DamengCreateTableSqlBuilder(table).build(tablePath);
}

@Override
protected String getDropTableSql(TablePath tablePath) {
return String.format("DROP TABLE %s", getTableName(tablePath));
return String.format("DROP TABLE %s", tablePath.getSchemaAndTableName("\""));
}

@Override
protected String getTableName(TablePath tablePath) {
return tablePath.getSchemaAndTableName().toUpperCase();
return tablePath.getSchemaAndTableName();
}

@Override
@@ -115,6 +120,7 @@ protected String getSelectColumnsSql(TablePath tablePath) {
protected Column buildColumn(ResultSet resultSet) throws SQLException {
String columnName = resultSet.getString("COLUMN_NAME");
String typeName = resultSet.getString("DATA_TYPE");
String sourceTypeName = resultSet.getString("SOURCE_TYPE");
long columnLength = resultSet.getLong("DATA_LENGTH");
long columnPrecision = resultSet.getLong("DATA_PRECISION");
long columnScale = resultSet.getLong("DATA_SCALE");
@@ -123,20 +129,52 @@ protected Column buildColumn(ResultSet resultSet) throws SQLException {
boolean isNullable = resultSet.getString("NULLABLE").equals("Y");

SeaTunnelDataType<?> type = fromJdbcType(typeName, columnPrecision, columnScale);

long bitLen = 0;
long longColumnLength = 0;
switch (typeName) {
case DamengDataTypeConvertor.DM_BIT:
bitLen = columnLength;
break;
case DamengDataTypeConvertor.DM_DECIMAL:
case DamengDataTypeConvertor.DM_TIMESTAMP:
case DamengDataTypeConvertor.DM_DATETIME:
case DamengDataTypeConvertor.DM_TIME:
columnLength = columnScale;
break;
case DamengDataTypeConvertor.DM_CHAR:
case DamengDataTypeConvertor.DM_CHARACTER:
case DamengDataTypeConvertor.DM_VARCHAR:
case DamengDataTypeConvertor.DM_VARCHAR2:
case DamengDataTypeConvertor.DM_LONGVARCHAR:
case DamengDataTypeConvertor.DM_CLOB:
case DamengDataTypeConvertor.DM_TEXT:
case DamengDataTypeConvertor.DM_LONG:
longColumnLength = columnLength;
break;
case DamengDataTypeConvertor.DM_BINARY:
case DamengDataTypeConvertor.DM_VARBINARY:
case DamengDataTypeConvertor.DM_BLOB:
case DamengDataTypeConvertor.DM_BFILE:
case DamengDataTypeConvertor.DM_IMAGE:
case DamengDataTypeConvertor.DM_LONGVARBINARY:
bitLen = columnLength * 8;
break;
default:
break;
}
return PhysicalColumn.of(
columnName,
type,
0,
((int) columnLength),
isNullable,
defaultValue,
columnComment,
typeName,
sourceTypeName,
false,
false,
0L,
bitLen,
null,
columnLength);
longColumnLength);
}

private SeaTunnelDataType<?> fromJdbcType(String typeName, long precision, long scale) {
Loading