Description
Inspired by Postgres [1]
Common use case - bulk data load through JDBC/ODBC interface. Currently it is only possible to execute single commands one by one. We already can batch them to improve performance, but there is still big room for improvement.
We should think of a completely new command - COPY. It will accept a file (or input stream in general case) on the client side, then transfer data to the cluster, and then execute update inside the cluster, e.g. through streamer.
First of all we need to create quick and dirty prototype to assess potential performance improvement. It speedup is confirmed, we should build base implementation which will accept only files. But at the same time we should understand how it will evolve in future: multiple file formats (probably including Hadoop formarts, e.g. Parquet), escape characters, input streams, etc..
[1] https://www.postgresql.org/docs/9.6/static/sql-copy.html
Proposed syntax
Curent implementation:
COPY FROM "file.name" INTO <schema>.<table> [(col-name, ...)] FORMAT <format-name> -- Only CSV format is supported in the current release [BATCH_SIZE <batch-size-in-bytes>]
We may want to gradually add features to this command in future to have something like this:
COPY FROM "file.name"[CHARSET "<charset-name>"] INTO <schema>.<table> [CREATE [IF NOT EXISTS]] [(col-name [<data-type>] [NULLABLE] [ESCAPES], ...) [MATCH HEADER]] FORMAT (csv|tsv|...) -- CSV format options: [FIELDSEP='column-separators-regexp'] [LINESEP='row-separators-regexp'] [QUOTE='quote-chars'] [ESCAPE='escape-char'] [NULL='null-sequence'] [COMMENT='single-line-comment-start-char'] [TRIM_LINES] [IMPORT_EMPTY_LINES] [CHARSET "<charset-name>"] [ROWS <first>-<last>] --or-- [SKIP ROWS <num>] [MAX ROWS <num>] [COLS <first>-<last>] --or-- [SKIP COLS <num>] [MAX COLS <num>] [(MATCH | SKIP) HEADER] [(REPLACE|IGNORE|ABORT ON [<max-error-number>])) DUPLICATE KEYS] [BATCH SIZE (<num> ROWS | <num>[K|M|G|T|P])] [COMPRESS "codec-name" [codec options]] [LOCK (TABLE|ROWS)] [NOLOGGING] [BACKEND (DIRECT | STREAMER)]
Implementation decisions and notes
Parsing
- We support CSV format described in RFC 4180.
- Custom row and column separators, quoting characters are currently hardcoded
- Escape sequences, line comment characters are currently not supported
- We may want to support fixed-length formats (via format descriptors) in future
- We may want to strip comments from lines (for example, starting with '#')
- We may want to allow user to either ignore empty lines or treat them as a special case of record having all default values
- We may allow user to enable whitespace trimming from beginning and end of a line
- We may want to allow user to specify error handling strategy: e.g., only one quote character is present or escape sequence is invalid.
File handling
- File character set to be supported in future
- Skipped/imported row number (or first/last line or skip header option), skipped/imported column number (or first/last column): to be supported in future
- Line start pattern (as in MySQL): no support planned
- We currently support only client-side import. No server-side file import.
- We may want to support client-side stdin import in future.
- We do not handle importing multiple files from single command
- We don't benefit from any kind of pre-sorting pre-partitioning data on client side.
- We don't include any any metadata, such as line number from client side.
Transferring data
- We send file data via batches. In future we will support batch size (specified with rows per batch or data block size
per batch). - We may want to implement data compression in future.
- We connect to single node in JDBC driver (no multi-node connections).
Cache/tables/column handling
- We don't create table in the bulk load command
- We may want to have and option for reading header row, which contains column names to match columns
- In future we may wish to support COLUMNS (col1, , col2, _, col3) syntax, where '' marker means a skipped column (MySQL uses '@dummy' for this)
Data types
- Data types are converted as if they were supplied to INSERT SQL command.
- We may want type conversion (automatic, custom using sql function, custom via Java code, string auto-trimming) in future.
- We will support optional null sequence ("\N") later
- We may want to allow user to specify what to do if the same record exists (e.g., ignore record, replace it, report error with a max. error counter before failing the command)
- We don't currently support any generated/autoincremented row IDs or any custom generators.
- We don't support any filtering/conditional expressions
- We don't support any files/recordsets/tables with multiple conversion errors generated during import.
Backend / Transactional / MVCC / other
- We may want an option to select how do we insert the data into cache: using cache.putAll(...), for example, or via data streamer interface (see BACKEND option)
- We don't use transactions
- We don't create locks on rows or tables.
- We don't try to minimize any indexing overhead (it's up to the user)
- We may want to minimize WAL impact in future via NOLOGGING option.
Miscellanea
- We don't supply an utility to load data
- We don't currently supply any java loaders (as in PG and MSSQL) that stream data (not neccessary from file)
- Security-related questions are out of scope of this JIRA
- We don't have triggers and constraints in Apache Ignite
Implementations from other vendors
PostgreSQL
COPY table_name [ ( column_name [, ...] ) ] FROM { 'filename' | STDIN } [ [ WITH ] [ BINARY ] [ OIDS ] [ DELIMITER [ AS ] 'delimiter' ] [ NULL [ AS ] 'null string' ] [ CSV [ HEADER ] [ QUOTE [ AS ] 'quote' ] [ ESCAPE [ AS ] 'escape' ] [ FORCE NOT NULL column_name [, ...] ] ] ]
(https://www.postgresql.org/docs/9.2/static/sql-copy.html)
Notes
- Server-side file import
- Client-side: only from STDIN
- Protocol implementation: via special command in the protocol
- Special bulk data loaders in implemented as part of JDBC driver package: org.postgresql.copy.CopyManager
(https://jdbc.postgresql.org/documentation/publicapi/org/postgresql/copy/CopyManager.html) - Custom loaders available (e.g., https://github.com/bytefish/PgBulkInsert.git)
MySQL
LOAD DATA [LOW_PRIORITY | CONCURRENT] [LOCAL] INFILE 'file_name' [REPLACE | IGNORE] INTO TABLE tbl_name [PARTITION (partition_name [, partition_name] ...)] [CHARACTER SET charset_name] [{FIELDS | COLUMNS} [TERMINATED BY 'string'] [[OPTIONALLY] ENCLOSED BY 'char'] [ESCAPED BY 'char'] ] [LINES [STARTING BY 'string'] [TERMINATED BY 'string'] ] [IGNORE number {LINES | ROWS}] [(col_name_or_user_var [, col_name_or_user_var] ...)] [SET col_name={expr | DEFAULT}, [, col_name={expr | DEFAULT}] ...]
(https://dev.mysql.com/doc/refman/5.7/en/load-data.html)
Notes
- Both client- and server-side import
- Protocol implementation via a hack: if result set returned with column count == -1, read file name from server and send it immediately.
Microsoft SQL Server
BULK INSERT [ database_name . [ schema_name ] . | schema_name . ] [ table_name | view_name ] FROM 'data_file' [ WITH ( [ [ , ] BATCHSIZE = batch_size ] [ [ , ] CHECK_CONSTRAINTS ] [ [ , ] CODEPAGE = { 'ACP' | 'OEM' | 'RAW' | 'code_page' } ] [ [ , ] DATAFILETYPE = { 'char' | 'native'| 'widechar' | 'widenative' } ] [ [ , ] DATASOURCE = 'data_source_name' ] [ [ , ] ERRORFILE = 'file_name' ] [ [ , ] ERRORFILE_DATASOURCE = 'data_source_name' ] [ [ , ] FIRSTROW = first_row ] [ [ , ] FIRE_TRIGGERS ] [ [ , ] FORMATFILE_DATASOURCE = 'data_source_name' ] [ [ , ] KEEPIDENTITY ] [ [ , ] KEEPNULLS ] [ [ , ] KILOBYTES_PER_BATCH = kilobytes_per_batch ] [ [ , ] LASTROW = last_row ] [ [ , ] MAXERRORS = max_errors ] [ [ , ] ORDER ( { column [ ASC | DESC ] } [ ,...n ] ) ] [ [ , ] ROWS_PER_BATCH = rows_per_batch ] [ [ , ] ROWTERMINATOR = 'row_terminator' ] [ [ , ] TABLOCK ] -- input file format options [ [ , ] FORMAT = 'CSV' ] [ [ , ] FIELDQUOTE = 'quote_characters'] [ [ , ] FORMATFILE = 'format_file_path' ] [ [ , ] FIELDTERMINATOR = 'field_terminator' ] [ [ , ] ROWTERMINATOR = 'row_terminator' ] )]
(https://docs.microsoft.com/en-us/sql/t-sql/statements/bulk-insert-transact-sql)
Notes
- Server-side import
- CLI utility to import from client side
- Protocol implementation: Special packet types: column definition and row
- Custom bulk data supplied in JDBC driver package: com.microsoft.sqlserver.jdbc.SqlServerBulkCopy.
Oracle
There is no bulk load SQL command. Bulk loading external data can be achieved via:
- Oracle External Tables
(https://docs.oracle.com/en/database/oracle/oracle-database/12.2/sutil/oracle-external-tables-concepts.html) - SQL*Loader
(https://docs.oracle.com/database/121/SUTIL/GUID-8D037494-07FA-4226-B507-E1B2ED10C144.htm#SUTIL3311). - There is a separate utility for Oracle TimesTen in-memory database:
https://docs.oracle.com/database/121/TTREF/util.htm#TTREF324
Vertica DB
COPY [ [db-name.]schema-name.]target-table ... [ ( { column-as-expression | column } ...... [ DELIMITER [ AS ] 'char' ] ...... [ ENCLOSED [ BY ] 'char' ] ...... [ ENFORCELENGTH ] ...... [ ESCAPE [ AS ] 'char' | NO ESCAPE ] ...... [ FILLER datatype] ...... [ FORMAT 'format' ] ...... [ NULL [ AS ] 'string' ] ...... [ TRIM 'byte' ] ... [, ... ] ) ] ... [ COLUMN OPTION ( column ...... [ DELIMITER [ AS ] 'char' ] ...... [ ENCLOSED [ BY ] 'char' ] ...... [ ENFORCELENGTH ] ...... [ ESCAPE [ AS ] 'char' | NO ESCAPE ] ...... [ FORMAT 'format' ] ...... [ NULL [ AS ] 'string' ] ...... [ TRIM 'byte' ] ... [, ... ] ) ] [ FROM { ...STDIN ...... [ BZIP | GZIP | LZO | UNCOMPRESSED ] ...| 'pathToData' [ ON nodename | ON ANY NODE ] ...... [ BZIP | GZIP | LZO | UNCOMPRESSED ] [, ...] ...| LOCAL {STDIN | 'pathToData'} ...... [ BZIP | GZIP | LZO | UNCOMPRESSED ] [, ...] ...| VERTICA source_database.[source_schema.]source_table[(source_column [,...]) ] } ] ...[ NATIVE .....| NATIVE VARCHAR .....| FIXEDWIDTH COLSIZES (integer [,...]) .....| ORC .....| PARQUET ...] ...[ WITH ] ......[ SOURCE source([arg=value [,...] ]) ] ......[ FILTER filter([arg=value [,...] ]) ] ......[ PARSER parser([arg=value [,...] ]) ] ...[ DELIMITER [ AS ] 'char' ] ...[ TRAILING NULLCOLS ] ...[ NULL [ AS ] 'string' ] ...[ ESCAPE [ AS ] 'char' | NO ESCAPE ] ...[ ENCLOSED [ BY ] 'char' ] ...[ RECORD TERMINATOR 'string' ] ...[ SKIP records ] ...[ SKIP BYTES integer ] ...[ TRIM 'byte' ] ...[ REJECTMAX integer ] ...[ REJECTED DATA {'path' [ ON nodename ] [, ...] | AS TABLE reject-table} ] ...[ EXCEPTIONS 'path' [ ON nodename ] [, ...] ] ...[ ENFORCELENGTH ] ...[ ERROR TOLERANCE ] ...[ ABORT ON ERROR ] ...[ [ STORAGE ] load-method ] ...[ STREAM NAME 'streamName'] ...[ NO COMMIT ]
Various solutions from vendors not mentioned above
- Apache Hive:
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Loadingfilesintotables - Apache HBase:
http://dwgeek.com/apache-hbase-bulk-load-csv-examples.html - SAP IQ:
https://help.sap.com/viewer/a898a5b484f21015a377cd1ccb6ee9b5/16.0.11.12/en-US/a6209de484f21015bcb2d858c21ab35e.html - VoltDB:
https://docs.voltdb.com/UsingVoltDB/clicsvloader.php - MemSQL:
https://docs.memsql.com/sql-reference/v6.0/load-data/
(creating pipelines and connecting them to LOAD DATA statement is also a notable feature) - Sybase:
http://infocenter.sybase.com/help/index.jsp?topic=/com.sybase.help.sqlanywhere.12.0.1/dbreference/input-statement.html - IBM DB2:
https://www.ibm.com/support/knowledgecenter/en/SSEPGG_9.5.0/com.ibm.db2.luw.admin.cmd.doc/doc/r0008305.html - IBM Informix:
https://www.ibm.com/support/knowledgecenter/en/SSGU8G_12.1.0/com.ibm.sqls.doc/ids_sqs_0884.htm - Apache Derby (AKA Java DB, Apache DB):
https://db.apache.org/derby/docs/10.7/tools/ttoolsimporting.html - Google Cloud Spanner:
https://cloud.google.com/spanner/docs/bulk-loading
Attachments
Issue Links
- blocks
-
IGNITE-7709 SQL COPY: fix file name handling
- Resolved
-
IGNITE-7657 SQL COPY: minor style fixes
- Resolved
-
IGNITE-7539 SQL COPY command: implement NULL escape sequence handling
- Open
-
IGNITE-7537 SQL COPY command: implement CSV format options
- Reopened
-
IGNITE-7535 SQL COPY command: implement encoding option
- Resolved
-
IGNITE-7586 SQL COPY: add code examples
- Resolved
-
IGNITE-7541 SQL COPY command: implement backend switching option
- Closed
-
IGNITE-7544 SQL COPY command: implement statistics gathering and error reporting
- Closed
-
IGNITE-7545 SQL COPY command: implement duplicate handling strategy
- Closed
-
IGNITE-7546 SQL COPY command: implement CSV header row parsing
- Closed
-
IGNITE-7548 SQL COPY: handle CSV lines with less values than required correctly
- Closed
-
IGNITE-7549 SQL COPY: implement file transfer compression
- Closed
-
IGNITE-7554 SQL COPY: consider providing support for the command in batch mode
- Closed
-
IGNITE-7587 SQL COPY: document the command
- Closed
-
IGNITE-7601 SQL COPY: add tests for implicit columns (_key, _val, _ver)
- Closed
-
IGNITE-7602 SQL COPY: add tests for file reading and JDBC connection failures
- Closed
-
IGNITE-7605 SQL COPY: add more SQL parser tests for positive scenarios
- Closed
-
IGNITE-7681 SQL COPY: local performance improvements
- Closed
-
IGNITE-7724 SQL COPY: network performance improvements
- Closed
-
IGNITE-7542 SQL COPY command: implement multipliers (e.g., 'K' for kilo-, 'M' for mega-) in numbers
- Closed
-
IGNITE-7553 SQL COPY: provide support for the command in org.apache.ignite.internal.jdbc2 driver
- Closed
-
IGNITE-7661 SQL COPY: provide more tests for national Unicode characters (including surrogates and 0x10000+ range)
- Closed
-
IGNITE-7732 SQL COPY: ODBC support
- Open
-
IGNITE-7736 SQL COPY: streaming model for network packets instead of request-response model
- Open
-
IGNITE-7856 ODBC: Support COPY command
- Open
-
IGNITE-7737 SQL COPY: rename "batch_size" to "packet_size"
- Resolved
-
IGNITE-7733 SQL COPY: native API support
- Closed
-
IGNITE-7734 SQL COPY: support single quotes for file name
- Closed
-
IGNITE-7735 SQL COPY: expose streamer options to user
- Closed
- is related to
-
IGNITE-7846 SQL: support COPY command by native SQL
- Closed
- links to