-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[FLINK-36796][pipeline-connector][oracle]add oracle pipeline connector. #3995
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your contribution. I just left some comment.
<dependency> | ||
<groupId>io.debezium</groupId> | ||
<artifactId>debezium-core</artifactId> | ||
<version>1.9.8.Final</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use ${debezium.version}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And the scope should be "provide".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have been modified
<dependency> | ||
<groupId>com.ververica</groupId> | ||
<artifactId>flink-cdc-source-e2e-tests</artifactId> | ||
<version>cty-3.0-2.2-SNAPSHOT</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why you dependent on "flink-cdc-source-e2e-tests"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have been removed
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-connector-test-util</artifactId> | ||
<version>3.4-SNAPSHOT</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use "${project.version}".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have been modified
<include>io.debezium:debezium-core</include> | ||
<include>io.debezium:debezium-ddl-parser</include> | ||
<include>io.debezium:debezium-connector-oracle</include> | ||
<include>io.debezium:debezium-connector-mysql</include> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain the rationale behind having a dependency on the MySQL CDC connector within the Oracle CDC connector implementation? I'd like to better understand how these components interact in this context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have been removed
<include>io.debezium:debezium-connector-oracle</include> | ||
<include>io.debezium:debezium-connector-mysql</include> | ||
<include>com.ververica:flink-connector-debezium</include> | ||
<include>com.ververica:flink-connector-mysql-cdc</include> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have been removed
<include>org.antlr:antlr4-runtime</include> | ||
<include>org.apache.kafka:*</include> | ||
<include>mysql:mysql-connector-java</include> | ||
<include>com.zendesk:mysql-binlog-connector-java</include> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have been removed
public static final ConfigOption<String> JDBC_URL = | ||
ConfigOptions.key("jdbc.url") | ||
.stringType() | ||
.noDefaultValue() | ||
.withDescription("The jdbc url."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we clarify the relationship between the JDBC URL and the individual hostname/port parameters?
If we already have a “jdbc.url” configuration field, is there still value in maintaining separate “hostname and “port” parameters? Should these be mutually exclusive?
When using “hostname” and "port" configuration, is there a parameter to explicitly specify the driver type (Thin vs OCI)? This would be crucial for constructing the correct JDBC connection string format.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In The original oracle module [flink-connector-oracle-cdc]OracleJdbcUrlUtils#getConnectionUrlWithSid method,url,hostname and port is not exclusive,when url config is null,url = "jdbc:oracle:thin:@" + hostname + ":" + port + ":" + dbname;,but 19c url is must connect through the service name,like url = "jdbc:oracle:thin:@" + hostname + ":" + port + "/" + dbname; So you can configure the URL to adapt to oracle 19c
if (isAddMeta) { | ||
map.put(OracleDataSourceOptions.HOSTNAME.key(), hostname); | ||
map.put(OracleDataSourceOptions.PORT.key(), port); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you clarify the design intent behind treating hostname and port as common metadata fields?
Since we don't currently support multi-source CDC synchronization, are these fields intended for future extensibility?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have been removed
820b720
to
5015bbb
Compare
@joyCurry30 Already modified,please review again ,thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, thank you for your contribution. I left some comments for the doc.
|
||
```yaml | ||
source: | ||
type: mysql |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
“type” should be "oracle".
- `initial` (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。 | ||
- `latest-offset`:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does oracle cdc use binlog?
<tr> | ||
<td> | ||
XMLTYPE | ||
</td> | ||
<td>VARCHAR(n)</td> | ||
<td></td> | ||
</tr> | ||
<tr> | ||
<td> | ||
VARCHAR(n)<br> | ||
VARCHAR2(n)<br> | ||
NVARCHAR2(n)<br> | ||
NCHAR(n)<br> | ||
CHAR(n)<br> | ||
</td> | ||
<td>VARCHAR(n)</td> | ||
<td></td> | ||
</tr> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two elements should be merged.
|
||
```yaml | ||
source: | ||
type: mysql |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
- `initial` (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog. | ||
- `latest-offset`: Never to perform snapshot on the monitored database tables upon first startup, just read from | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
## 数据类型映射 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use English, please.
5015bbb
to
f284d27
Compare
f6b25cc
to
4f73a6f
Compare
.hostname( | ||
config.getOptional(OracleDataSourceOptions.HOSTNAME).get()) | ||
.port(config.getOptional(OracleDataSourceOptions.PORT).get()) | ||
.databaseList( | ||
config.getOptional(OracleDataSourceOptions.DATABASE) | ||
.get()) // monitor oracledatabase | ||
.tableList( | ||
config.getOptional(OracleDataSourceOptions.TABLES) | ||
.get()) // monitor productstable | ||
.username( | ||
config.getOptional(OracleDataSourceOptions.USERNAME).get()) | ||
.password( | ||
config.getOptional(OracleDataSourceOptions.PASSWORD).get()) | ||
.includeSchemaChanges(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use config.get(ConfigOption option).
options.add(OracleDataSourceOptions.SCHEMALIST); | ||
options.add(OracleDataSourceOptions.DATABASE); | ||
options.add(OracleDataSourceOptions.TABLES); | ||
options.add(METADATA_LIST); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use OracleDataSourceOptions.METADATA_LIST.
private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial"; | ||
private static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset"; | ||
private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset"; | ||
private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET = "specific-offset"; | ||
private static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use enum replace switch-case.
builder.hostname(config.getOptional(OracleDataSourceOptions.HOSTNAME).get()) | ||
.port(config.getOptional(OracleDataSourceOptions.PORT).get()) | ||
.database( | ||
config.getOptional(OracleDataSourceOptions.DATABASE) | ||
.get()) // monitor database | ||
.schemaList( | ||
config.getOptional(OracleDataSourceOptions.SCHEMALIST) | ||
.get()) // monitor schema | ||
.tableList(capturedTables) // monitor | ||
// EMP table | ||
.username(config.getOptional(OracleDataSourceOptions.USERNAME).get()) | ||
.password(config.getOptional(OracleDataSourceOptions.PASSWORD).get()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use config.get(ConfigOption option)
4f73a6f
to
3479f43
Compare
<shadedPattern> | ||
com.ververica.cdc.connectors.shaded.org.apache.kafka | ||
</shadedPattern> | ||
</relocation> | ||
<relocation> | ||
<pattern>org.antlr</pattern> | ||
<shadedPattern> | ||
com.ververica.cdc.connectors.shaded.org.antlr | ||
</shadedPattern> | ||
</relocation> | ||
<relocation> | ||
<pattern>com.fasterxml</pattern> | ||
<shadedPattern> | ||
com.ververica.cdc.connectors.shaded.com.fasterxml | ||
</shadedPattern> | ||
</relocation> | ||
<relocation> | ||
<pattern>com.google</pattern> | ||
<shadedPattern> | ||
com.ververica.cdc.connectors.shaded.com.google | ||
</shadedPattern> | ||
</relocation> | ||
<relocation> | ||
<pattern>com.esri.geometry</pattern> | ||
<shadedPattern>com.ververica.cdc.connectors.shaded.com.esri.geometry</shadedPattern> | ||
</relocation> | ||
<relocation> | ||
<pattern>com.zaxxer</pattern> | ||
<shadedPattern> | ||
com.ververica.cdc.connectors.shaded.com.zaxxer | ||
</shadedPattern> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why relocate com.ververica.cdc.connectors?
Please use the correct package path.
package com.apache.flink.cdc.connectors.oracle.dto; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use org.apache.
import com.apache.flink.cdc.connectors.oracle.source.OracleDataSource; | ||
import com.apache.flink.cdc.connectors.oracle.source.OracleDataSourceOptions; | ||
import com.apache.flink.cdc.connectors.oracle.utils.OracleSchemaUtils; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use org.apache.
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
|
||
import static com.apache.flink.cdc.connectors.oracle.source.OracleDataSourceOptions.METADATA_LIST; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use org.apache.
} | ||
throw new IllegalArgumentException( | ||
String.format( | ||
"[%s] cannot be found in mysql metadata.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oracle metadata
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for review!have modified!
13c6586
to
ba183e3
Compare
ba183e3
to
f451287
Compare
0c77212
to
0237401
Compare
add oracle pipeline connector.