Skip to content

[FLINK-36796][pipeline-connector][oracle]add oracle pipeline connector. #3995

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

linjianchang
Copy link
Contributor

add oracle pipeline connector.

Copy link
Contributor

@joyCurry30 joyCurry30 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your contribution. I just left some comment.

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>1.9.8.Final</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use ${debezium.version}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the scope should be "provide".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have been modified

<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-cdc-source-e2e-tests</artifactId>
<version>cty-3.0-2.2-SNAPSHOT</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why you dependent on "flink-cdc-source-e2e-tests"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have been removed

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-util</artifactId>
<version>3.4-SNAPSHOT</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use "${project.version}".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have been modified

<include>io.debezium:debezium-core</include>
<include>io.debezium:debezium-ddl-parser</include>
<include>io.debezium:debezium-connector-oracle</include>
<include>io.debezium:debezium-connector-mysql</include>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain the rationale behind having a dependency on the MySQL CDC connector within the Oracle CDC connector implementation? I'd like to better understand how these components interact in this context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have been removed

<include>io.debezium:debezium-connector-oracle</include>
<include>io.debezium:debezium-connector-mysql</include>
<include>com.ververica:flink-connector-debezium</include>
<include>com.ververica:flink-connector-mysql-cdc</include>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have been removed

<include>org.antlr:antlr4-runtime</include>
<include>org.apache.kafka:*</include>
<include>mysql:mysql-connector-java</include>
<include>com.zendesk:mysql-binlog-connector-java</include>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have been removed

Comment on lines 34 to 38
public static final ConfigOption<String> JDBC_URL =
ConfigOptions.key("jdbc.url")
.stringType()
.noDefaultValue()
.withDescription("The jdbc url.");
Copy link
Contributor

@joyCurry30 joyCurry30 Apr 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we clarify the relationship between the JDBC URL and the individual hostname/port parameters?

If we already have a “jdbc.url” configuration field, is there still value in maintaining separate “hostname and “port” parameters? Should these be mutually exclusive?

When using “hostname” and "port" configuration, is there a parameter to explicitly specify the driver type (Thin vs OCI)? This would be crucial for constructing the correct JDBC connection string format.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In The original oracle module [flink-connector-oracle-cdc]OracleJdbcUrlUtils#getConnectionUrlWithSid method,url,hostname and port is not exclusive,when url config is null,url = "jdbc:oracle:thin:@" + hostname + ":" + port + ":" + dbname;,but 19c url is must connect through the service name,like url = "jdbc:oracle:thin:@" + hostname + ":" + port + "/" + dbname; So you can configure the URL to adapt to oracle 19c

Comment on lines 140 to 143
if (isAddMeta) {
map.put(OracleDataSourceOptions.HOSTNAME.key(), hostname);
map.put(OracleDataSourceOptions.PORT.key(), port);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify the design intent behind treating hostname and port as common metadata fields?

Since we don't currently support multi-source CDC synchronization, are these fields intended for future extensibility?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have been removed

@github-actions github-actions bot added the docs Improvements or additions to documentation label Apr 21, 2025
@linjianchang
Copy link
Contributor Author

@joyCurry30 Already modified,please review again ,thanks!

Copy link
Contributor

@joyCurry30 joyCurry30 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, thank you for your contribution. I left some comments for the doc.


```yaml
source:
type: mysql
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

“type” should be "oracle".

Comment on lines 172 to 173
- `initial` (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。
- `latest-offset`:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does oracle cdc use binlog?

Comment on lines 261 to 271
<tr>
<td>
XMLTYPE
</td>
<td>VARCHAR(n)</td>
<td></td>
</tr>
<tr>
<td>
VARCHAR(n)<br>
VARCHAR2(n)<br>
NVARCHAR2(n)<br>
NCHAR(n)<br>
CHAR(n)<br>
</td>
<td>VARCHAR(n)</td>
<td></td>
</tr>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two elements should be merged.


```yaml
source:
type: mysql
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

Comment on lines 176 to 178
- `initial` (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
- `latest-offset`: Never to perform snapshot on the monitored database tables upon first startup, just read from

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

Comment on lines 189 to 190
## 数据类型映射

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use English, please.

Comment on lines 86 to 96
.hostname(
config.getOptional(OracleDataSourceOptions.HOSTNAME).get())
.port(config.getOptional(OracleDataSourceOptions.PORT).get())
.databaseList(
config.getOptional(OracleDataSourceOptions.DATABASE)
.get()) // monitor oracledatabase
.tableList(
config.getOptional(OracleDataSourceOptions.TABLES)
.get()) // monitor productstable
.username(
config.getOptional(OracleDataSourceOptions.USERNAME).get())
.password(
config.getOptional(OracleDataSourceOptions.PASSWORD).get())
.includeSchemaChanges(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use config.get(ConfigOption option).

options.add(OracleDataSourceOptions.SCHEMALIST);
options.add(OracleDataSourceOptions.DATABASE);
options.add(OracleDataSourceOptions.TABLES);
options.add(METADATA_LIST);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use OracleDataSourceOptions.METADATA_LIST.

Comment on lines 194 to 198
private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
private static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset";
private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET = "specific-offset";
private static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use enum replace switch-case.

Comment on lines 185 to 196
builder.hostname(config.getOptional(OracleDataSourceOptions.HOSTNAME).get())
.port(config.getOptional(OracleDataSourceOptions.PORT).get())
.database(
config.getOptional(OracleDataSourceOptions.DATABASE)
.get()) // monitor database
.schemaList(
config.getOptional(OracleDataSourceOptions.SCHEMALIST)
.get()) // monitor schema
.tableList(capturedTables) // monitor
// EMP table
.username(config.getOptional(OracleDataSourceOptions.USERNAME).get())
.password(config.getOptional(OracleDataSourceOptions.PASSWORD).get())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use config.get(ConfigOption option)

Comment on lines 213 to 243
<shadedPattern>
com.ververica.cdc.connectors.shaded.org.apache.kafka
</shadedPattern>
</relocation>
<relocation>
<pattern>org.antlr</pattern>
<shadedPattern>
com.ververica.cdc.connectors.shaded.org.antlr
</shadedPattern>
</relocation>
<relocation>
<pattern>com.fasterxml</pattern>
<shadedPattern>
com.ververica.cdc.connectors.shaded.com.fasterxml
</shadedPattern>
</relocation>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>
com.ververica.cdc.connectors.shaded.com.google
</shadedPattern>
</relocation>
<relocation>
<pattern>com.esri.geometry</pattern>
<shadedPattern>com.ververica.cdc.connectors.shaded.com.esri.geometry</shadedPattern>
</relocation>
<relocation>
<pattern>com.zaxxer</pattern>
<shadedPattern>
com.ververica.cdc.connectors.shaded.com.zaxxer
</shadedPattern>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why relocate com.ververica.cdc.connectors?
Please use the correct package path.

Comment on lines 18 to 19
package com.apache.flink.cdc.connectors.oracle.dto;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use org.apache.

Comment on lines 32 to 34
import com.apache.flink.cdc.connectors.oracle.source.OracleDataSource;
import com.apache.flink.cdc.connectors.oracle.source.OracleDataSourceOptions;
import com.apache.flink.cdc.connectors.oracle.utils.OracleSchemaUtils;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use org.apache.

import java.util.Set;
import java.util.stream.Collectors;

import static com.apache.flink.cdc.connectors.oracle.source.OracleDataSourceOptions.METADATA_LIST;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use org.apache.

}
throw new IllegalArgumentException(
String.format(
"[%s] cannot be found in mysql metadata.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oracle metadata

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for review!have modified!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
base debezium docs Improvements or additions to documentation oracle-cdc-connector
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants