Details
-
New Feature
-
Status: Closed
-
Minor
-
Resolution: Fixed
-
None
-
None
Description
OceanBase is a distributed relational database, the community edition of OceanBase is open sourced at https://github.com/oceanbase/oceanbase.
The enterprise edition of OceanBase is compatible with MySql and Oracle, which means we can reuse almost all the dialect rules.
The difference from other databases is that we must provide the compatibility mode firstly, then the connector can determine which dialect to use, so a startup option like 'compatible-mode' is needed.
A dialect implementation for OceanBase is like below:
package org.apache.flink.connector.jdbc.databases.oceanbase; import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; import org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect; import org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleDialect; import org.apache.flink.connector.jdbc.dialect.AbstractDialect; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; import javax.annotation.Nonnull; import java.util.Optional; import java.util.Set; /** JDBC dialect for OceanBase. */ public class OceanBaseDialect extends AbstractDialect { private static final long serialVersionUID = 1L; private final AbstractDialect dialect; public OceanBaseDialect(@Nonnull String compatibleMode) { switch (compatibleMode.toLowerCase()) { case "mysql": this.dialect = new MySqlDialect(); break; case "oracle": this.dialect = new OracleDialect(); break; default: throw new IllegalArgumentException( "Unsupported compatible mode: " + compatibleMode); } } @Override public String dialectName() { return "OceanBase"; } @Override public Optional<String> defaultDriverName() { return Optional.of("com.oceanbase.jdbc.Driver"); } @Override public Set<LogicalTypeRoot> supportedTypes() { return dialect.supportedTypes(); } @Override public JdbcRowConverter getRowConverter(RowType rowType) { return dialect.getRowConverter(rowType); } @Override public String getLimitClause(long limit) { return dialect.getLimitClause(limit); } @Override public String quoteIdentifier(String identifier) { return dialect.quoteIdentifier(identifier); } @Override public Optional<String> getUpsertStatement( String tableName, String[] fieldNames, String[] conditionFields) { return dialect.getUpsertStatement(tableName, fieldNames, conditionFields); } @Override public Optional<Range> timestampPrecisionRange() { return dialect.timestampPrecisionRange(); } @Override public Optional<Range> decimalPrecisionRange() { return dialect.decimalPrecisionRange(); } @Override public String appendDefaultUrlProperties(String url) { return dialect.appendDefaultUrlProperties(url); } }
Attachments
Issue Links
- links to