Index: modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java (date 1519034013000) +++ modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java (revision ) @@ -55,7 +55,7 @@ /** {@inheritDoc} */ @Override protected void execute0(String sql, Boolean isQuery) throws SQLException { - assert isQuery != null && !isQuery; + assert isQuery == null || !isQuery; long updCnt = conn.ignite().context().query().streamUpdateQuery(conn.cacheName(), conn.schemaName(), streamer, sql, getArgs()); Index: modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java (date 1519034013000) +++ modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java (revision ) @@ -122,7 +122,7 @@ return cmd; } else - throw errorUnexpectedToken(lex, CREATE, DROP, COPY, ALTER); + throw errorUnexpectedToken(lex, CREATE, DROP, ALTER, COPY); case QUOTED: case MINUS: Index: modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java (date 1519034013000) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java (revision ) @@ -100,6 +100,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.T3; +import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -189,6 +190,9 @@ /** Pending status messages. */ private final LinkedList pendingMsgs = new LinkedList<>(); + /** All currently open client contexts. */ + private final Set cliCtxs = Collections.newSetFromMap(new ConcurrentHashMap<>()); + /** Current cache that has a query running on it. */ private final ThreadLocal curCache = new ThreadLocal<>(); @@ -259,11 +263,15 @@ if (cancel && idx != null) { try { - while (!busyLock.tryBlock(500)) + while (!busyLock.tryBlock(500)) { idx.cancelAllQueries(); + closeAllSqlStreams(); + } + return; - } catch (InterruptedException ignored) { + } + catch (InterruptedException ignored) { U.warn(log, "Interrupted while waiting for active queries cancellation."); Thread.currentThread().interrupt(); @@ -346,6 +354,32 @@ } } + /** + * @param cliCtx Client context to register. + */ + void registerClientContext(SqlClientContext cliCtx) { + A.notNull(cliCtx, "cliCtx"); + + cliCtxs.add(cliCtx); + } + + /** + * @param cliCtx Client context to register. + */ + void unregisterClientContext(SqlClientContext cliCtx) { + A.notNull(cliCtx, "cliCtx"); + + cliCtxs.remove(cliCtx); + } + + /** + * Flush streamers on all currently open client contexts. + */ + private void closeAllSqlStreams() { + for (SqlClientContext cliCtx : cliCtxs) + U.close(cliCtx, log); + } + /** * Process schema propose message from discovery thread. * @@ -1974,13 +2008,7 @@ */ public List>> querySqlFields(final SqlFieldsQuery qry, final boolean keepBinary, final boolean failOnMultipleStmts) { - return querySqlFields(null, qry, keepBinary, failOnMultipleStmts); - } - - @SuppressWarnings("unchecked") - public FieldsQueryCursor> querySqlFields(final GridCacheContext cctx, final SqlFieldsQuery qry, - final boolean keepBinary) { - return querySqlFields(cctx, qry, keepBinary, true).get(0); + return querySqlFields(null, qry, null, keepBinary, failOnMultipleStmts); } /** @@ -1991,7 +2019,7 @@ * @return Cursor. */ public FieldsQueryCursor> querySqlFields(final SqlFieldsQuery qry, final boolean keepBinary) { - return querySqlFields(null, qry, keepBinary, true).get(0); + return querySqlFields(null, qry, null, keepBinary, true).get(0); } /** @@ -1999,14 +2027,16 @@ * * @param cctx Cache context. * @param qry Query. + * @param cliCtx Client context. * @param keepBinary Keep binary flag. * @param failOnMultipleStmts If {@code true} the method must throws exception when query contains * more then one SQL statement. * @return Cursor. */ @SuppressWarnings("unchecked") - public List>> querySqlFields(@Nullable final GridCacheContext cctx, - final SqlFieldsQuery qry, final boolean keepBinary, final boolean failOnMultipleStmts) { + public List>> querySqlFields(@Nullable final GridCacheContext cctx, + final SqlFieldsQuery qry, final SqlClientContext cliCtx, final boolean keepBinary, + final boolean failOnMultipleStmts) { checkxEnabled(); validateSqlFieldsQuery(qry); @@ -2034,7 +2064,7 @@ GridQueryCancel cancel = new GridQueryCancel(); List>> res = - idx.querySqlFields(schemaName, qry, keepBinary, failOnMultipleStmts, cancel); + idx.querySqlFields(schemaName, qry, cliCtx, keepBinary, failOnMultipleStmts, cancel); if (cctx != null) sendQueryExecutedEvent(qry.getSql(), qry.getArgs(), cctx); @@ -2073,7 +2103,7 @@ * @param schemaName Schema name. * @param streamer Data streamer. * @param qry Query. - * @return Iterator. + * @return Update counter. */ public long streamUpdateQuery(@Nullable final String cacheName, final String schemaName, final IgniteDataStreamer streamer, final String qry, final Object[] args) { @@ -2098,6 +2128,33 @@ busyLock.leaveBusy(); } } + + /** + * @param schemaName Schema name. + * @param cliCtx Client context. + * @param qry Query. + * @param args Query arguments. + * @return Update counters. + */ + public List streamBatchedUpdateQuery(final String schemaName, final SqlClientContext cliCtx, + final String qry, final List args) { + if (!busyLock.enterBusy()) + throw new IllegalStateException("Failed to execute query (grid is stopping)."); + + try { + return executeQuery(GridCacheQueryType.SQL_FIELDS, qry, null, new IgniteOutClosureX>() { + @Override public List applyx() throws IgniteCheckedException { + return idx.streamBatchedUpdateQuery(schemaName, qry, args, cliCtx); + } + }, true); + } + catch (IgniteCheckedException e) { + throw new CacheException(e); + } + finally { + busyLock.leaveBusy(); + } + } /** * Execute distributed SQL query. Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java (date 1519034013000) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java (revision ) @@ -106,7 +106,7 @@ * @throws IgniteCheckedException On error. */ @SuppressWarnings("unchecked") - public FieldsQueryCursor> runDdlStatement(String sql, SqlCommand cmd) throws IgniteCheckedException{ + public FieldsQueryCursor> runDdlStatement(String sql, SqlCommand cmd) throws IgniteCheckedException { IgniteInternalFuture fut; try { @@ -211,12 +211,7 @@ if (fut != null) fut.get(); - QueryCursorImpl> resCur = (QueryCursorImpl>)new QueryCursorImpl(Collections.singletonList - (Collections.singletonList(0L)), null, false); - - resCur.fieldsMeta(UPDATE_RESULT_META); - - return resCur; + return zeroCursor(); } catch (SchemaOperationException e) { throw convert(e); @@ -229,6 +224,19 @@ } } + /** + * @return Single-column, single-row cursor with 0 as number of updated records. + */ + @SuppressWarnings("unchecked") + public static QueryCursorImpl> zeroCursor() { + QueryCursorImpl> resCur = (QueryCursorImpl>)new QueryCursorImpl(Collections.singletonList + (Collections.singletonList(0L)), null, false); + + resCur.fieldsMeta(UPDATE_RESULT_META); + + return resCur; + } + /** * Execute DDL statement. * Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java (date 1519034013000) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java (revision ) @@ -85,20 +85,21 @@ * @param loc Local query flag. * @param idx Indexing. * @param conn Connection. - * @param fieldsQuery Original query. + * @param fieldsQry Original query. * @return Update plan. */ public static UpdatePlan planForStatement(Prepared prepared, boolean loc, IgniteH2Indexing idx, - @Nullable Connection conn, @Nullable SqlFieldsQuery fieldsQuery, @Nullable Integer errKeysPos) + @Nullable Connection conn, @Nullable SqlFieldsQuery fieldsQry, @Nullable Integer errKeysPos) throws IgniteCheckedException { - assert !prepared.isQuery(); - GridSqlStatement stmt = new GridSqlQueryParser(false).parse(prepared); if (stmt instanceof GridSqlMerge || stmt instanceof GridSqlInsert) - return planForInsert(stmt, loc, idx, conn, fieldsQuery); + return planForInsert(stmt, loc, idx, conn, fieldsQry); + else if (stmt instanceof GridSqlUpdate || stmt instanceof GridSqlDelete) + return planForUpdate(stmt, loc, idx, conn, fieldsQry, errKeysPos); else - return planForUpdate(stmt, loc, idx, conn, fieldsQuery, errKeysPos); + throw new IgniteSQLException("Unsupported operation: " + prepared.getSQL(), + IgniteQueryErrorCode.UNSUPPORTED_OPERATION); } /** Index: modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java (date 1519034013000) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java (revision ) @@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.processors.query.SqlClientContext; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; @@ -88,6 +89,9 @@ /** Kernel context. */ private final GridKernalContext ctx; + /** Client context. */ + private final SqlClientContext cliCtx; + /** Logger. */ private final IgniteLogger log; @@ -103,24 +107,9 @@ /** Current bulk load processors. */ private final ConcurrentHashMap bulkLoadRequests = new ConcurrentHashMap<>(); - /** Distributed joins flag. */ - private final boolean distributedJoins; - - /** Enforce join order flag. */ - private final boolean enforceJoinOrder; - - /** Collocated flag. */ - private final boolean collocated; - /** Replicated only flag. */ private final boolean replicatedOnly; - /** Lazy query execution flag. */ - private final boolean lazy; - - /** Skip reducer on update flag. */ - private final boolean skipReducerOnUpdate; - /** Automatic close of cursors. */ private final boolean autoCloseCursors; @@ -140,22 +129,38 @@ * @param autoCloseCursors Flag to automatically close server cursors. * @param lazy Lazy query execution flag. * @param skipReducerOnUpdate Skip reducer on update flag. + * @param stream Streaming flag. + * @param streamAllowOverwrites Streaming overwrites flag. + * @param streamParOps Number of parallel ops per cluster node during streaming. + * @param streamBufSize Buffer size per cluster node during streaming. + * @param streamFlushFreq Data streamers' flush timeout. * @param protocolVer Protocol version. */ public JdbcRequestHandler(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors, boolean distributedJoins, boolean enforceJoinOrder, boolean collocated, boolean replicatedOnly, boolean autoCloseCursors, boolean lazy, boolean skipReducerOnUpdate, + boolean stream, boolean streamAllowOverwrites, int streamParOps, int streamBufSize, long streamFlushFreq, ClientListenerProtocolVersion protocolVer) { this.ctx = ctx; + + this.cliCtx = new SqlClientContext( + ctx, + distributedJoins, + enforceJoinOrder, + collocated, + lazy, + skipReducerOnUpdate, + stream, + streamAllowOverwrites, + streamParOps, + streamBufSize, + streamFlushFreq + ); + this.busyLock = busyLock; this.maxCursors = maxCursors; - this.distributedJoins = distributedJoins; - this.enforceJoinOrder = enforceJoinOrder; - this.collocated = collocated; this.replicatedOnly = replicatedOnly; this.autoCloseCursors = autoCloseCursors; - this.lazy = lazy; - this.skipReducerOnUpdate = skipReducerOnUpdate; this.protocolVer = protocolVer; log = ctx.log(getClass()); @@ -301,6 +306,8 @@ } bulkLoadRequests.clear(); + + U.close(cliCtx, log); } finally { busyLock.leaveBusy(); @@ -326,6 +333,8 @@ long qryId = QRY_ID_GEN.getAndIncrement(); + assert !cliCtx.isStream(); + try { String sql = req.sqlQuery(); @@ -347,17 +356,17 @@ qry = new SqlFieldsQueryEx(sql, false); - if (skipReducerOnUpdate) + if (cliCtx.isSkipReducerOnUpdate()) ((SqlFieldsQueryEx)qry).setSkipReducerOnUpdate(true); } qry.setArgs(req.arguments()); - qry.setDistributedJoins(distributedJoins); - qry.setEnforceJoinOrder(enforceJoinOrder); - qry.setCollocated(collocated); + qry.setDistributedJoins(cliCtx.isDistributedJoins()); + qry.setEnforceJoinOrder(cliCtx.isEnforceJoinOrder()); + qry.setCollocated(cliCtx.isCollocated()); qry.setReplicatedOnly(replicatedOnly); - qry.setLazy(lazy); + qry.setLazy(cliCtx.isLazy()); if (req.pageSize() <= 0) return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Invalid fetch size: " + req.pageSize()); @@ -371,7 +380,7 @@ qry.setSchema(schemaName); - List>> results = ctx.query().querySqlFields(qry, true, + List>> results = ctx.query().querySqlFields(null, qry, cliCtx, true, protocolVer.compareTo(VER_2_3_0) < 0); FieldsQueryCursor> fieldsCur = results.get(0); @@ -569,11 +578,11 @@ qry = new SqlFieldsQueryEx(q.sql(), false); - qry.setDistributedJoins(distributedJoins); - qry.setEnforceJoinOrder(enforceJoinOrder); - qry.setCollocated(collocated); + qry.setDistributedJoins(cliCtx.isDistributedJoins()); + qry.setEnforceJoinOrder(cliCtx.isEnforceJoinOrder()); + qry.setCollocated(cliCtx.isCollocated()); qry.setReplicatedOnly(replicatedOnly); - qry.setLazy(lazy); + qry.setLazy(cliCtx.isLazy()); qry.setSchema(schemaName); } @@ -601,10 +610,21 @@ * @param updCntsAcc Per query rows updates counter. * @param firstErr First error data - code and message. */ + @SuppressWarnings("ForLoopReplaceableByForEach") private void executeBatchedQuery(SqlFieldsQueryEx qry, List updCntsAcc, IgniteBiTuple firstErr) { try { - List>> qryRes = ctx.query().querySqlFields(qry, true, true); + if (cliCtx.isStream()) { + List cnt = ctx.query().streamBatchedUpdateQuery(qry.getSchema(), cliCtx, qry.getSql(), + qry.batchedArguments()); + + for (int i = 0; i < cnt.size(); i++) + updCntsAcc.add(cnt.get(i).intValue()); + + return; + } + + List>> qryRes = ctx.query().querySqlFields(null, qry, cliCtx, true, true); for (FieldsQueryCursor> cur : qryRes) { if (cur instanceof BulkLoadContextCursor) Index: modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java (date 1519034013000) +++ modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java (revision ) @@ -18,6 +18,7 @@ package org.apache.ignite.internal.jdbc.thin; import java.sql.Array; +import java.sql.BatchUpdateException; import java.sql.Blob; import java.sql.CallableStatement; import java.sql.Clob; @@ -33,13 +34,19 @@ import java.sql.Savepoint; import java.sql.Statement; import java.sql.Struct; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.Executor; +import java.util.logging.Level; import java.util.logging.Logger; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.odbc.ClientListenerResponse; import org.apache.ignite.internal.processors.odbc.SqlStateCode; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteRequest; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteResult; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResponse; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResult; @@ -94,6 +101,12 @@ /** Connection properties. */ private ConnectionProperties connProps; + /** Batch for streaming. */ + private List streamBatch; + + /** Last added query to recognize batches. */ + private String lastStreamQry; + /** * Creates new connection. * @@ -135,6 +148,53 @@ } } + /** + * @return Whether this connection is streamed or not. + */ + public boolean isStream() { + return connProps.isStream(); + } + + /** + * Add another query for batched execution. + * @param sql Query. + * @param args Arguments. + */ + synchronized void addBatch(String sql, List args) throws SQLException { + boolean newQry = (args == null || !F.eq(lastStreamQry, sql)); + + // Providing null as SQL here allows for recognizing subbatches on server and handling them more efficiently. + JdbcQuery q = new JdbcQuery(newQry ? sql : null, args != null ? args.toArray() : null); + + if (streamBatch == null) + streamBatch = new ArrayList<>(connProps.getStreamBatchSize()); + + streamBatch.add(q); + + // Null args means "addBatch(String)" was called on non-prepared Statement, + // we don't want to remember its query string. + lastStreamQry = (args != null ? sql : null); + + if (streamBatch.size() == connProps.getStreamBatchSize()) + executeBatch(); + } + + /** + * @throws SQLException if failed. + */ + private void executeBatch() throws SQLException { + JdbcBatchExecuteResult res = sendRequest(new JdbcBatchExecuteRequest(schema, streamBatch)); + + streamBatch = null; + + lastStreamQry = null; + + if (res.errorCode() != ClientListenerResponse.STATUS_SUCCESS) { + throw new BatchUpdateException(res.errorMessage(), IgniteQueryErrorCode.codeToSqlState(res.errorCode()), + res.errorCode(), res.updateCounts()); + } + } + /** {@inheritDoc} */ @Override public Statement createStatement() throws SQLException { return createStatement(TYPE_FORWARD_ONLY, CONCUR_READ_ONLY, HOLD_CURSORS_OVER_COMMIT); @@ -277,6 +337,15 @@ if (isClosed()) return; + if (!F.isEmpty(streamBatch)) { + try { + executeBatch(); + } + catch (SQLException e) { + LOG.log(Level.WARNING, "Exception during batch send on streamed connection close", e); + } + } + closed = true; cliIo.close(); Index: modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java (date 1519034013000) +++ modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java (revision ) @@ -140,7 +140,7 @@ SqlFieldsQuery qry = new SqlFieldsQuery("select f.productId, p.name, f.price " + "from FactPurchase f, \"replicated-prod\".DimProduct p where p.id = f.productId "); - for (List o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll()) { + for (List o : qryProc.querySqlFields(cache.context(), qry, null, false, true).get(0).getAll()) { X.println("___ -> " + o); set1.add((Integer)o.get(0)); @@ -154,7 +154,7 @@ qry = new SqlFieldsQuery("select productId from FactPurchase group by productId"); - for (List o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll()) { + for (List o : qryProc.querySqlFields(cache.context(), qry, null, false, true).get(0).getAll()) { X.println("___ -> " + o); assertTrue(set0.add((Integer) o.get(0))); @@ -173,7 +173,7 @@ "where p.id = f.productId " + "group by f.productId, p.name"); - for (List o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll()) { + for (List o : qryProc.querySqlFields(cache.context(), qry, null, false, true).get(0).getAll()) { X.println("___ -> " + o); assertTrue(names.add((String)o.get(0))); @@ -190,7 +190,7 @@ "group by f.productId, p.name " + "having s >= 15"); - for (List o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll()) { + for (List o : qryProc.querySqlFields(cache.context(), qry, null, false, true).get(0).getAll()) { X.println("___ -> " + o); assertTrue(i(o, 1) >= 15); @@ -203,7 +203,7 @@ qry = new SqlFieldsQuery("select top 3 distinct productId " + "from FactPurchase f order by productId desc "); - for (List o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll()) { + for (List o : qryProc.querySqlFields(cache.context(), qry, null, false, true).get(0).getAll()) { X.println("___ -> " + o); assertEquals(top--, o.get(0)); @@ -216,7 +216,7 @@ qry = new SqlFieldsQuery("select distinct productId " + "from FactPurchase f order by productId desc limit 2 offset 1"); - for (List o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll()) { + for (List o : qryProc.querySqlFields(cache.context(), qry, null, false, true).get(0).getAll()) { X.println("___ -> " + o); assertEquals(top--, o.get(0)); @@ -256,13 +256,13 @@ GridTestUtils.assertThrows(log, new Callable() { @Override public Object call() throws Exception { - qryProc.querySqlFields(cache.context(), qry, false, true); + qryProc.querySqlFields(cache.context(), qry, null, false, true); return null; } }, IgniteSQLException.class, "Multiple statements queries are not supported"); - List>> cursors = qryProc.querySqlFields(cache.context(), qry, false, false); + List>> cursors = qryProc.querySqlFields(cache.context(), qry, null, false, false); assertEquals(2, cursors.size()); @@ -274,7 +274,7 @@ GridTestUtils.assertThrows(log, new Callable() { @Override public Object call() throws Exception { - qryProc.querySqlFields(cache.context(), qry, false, false); + qryProc.querySqlFields(cache.context(), qry, null, false, false); return null; } Index: modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingSelfTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingSelfTest.java (revision ) +++ modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingSelfTest.java (revision ) @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.jdbc.thin; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest; +import org.apache.ignite.internal.processors.query.SqlClientContext; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Tests for streaming via thin driver. + */ +public class JdbcThinStreamingSelfTest extends JdbcStreamingSelfTest { + /** */ + private int batchSize = 17; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + batchSize = 17; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + try (Connection c = createOrdinaryConnection()) { + execute(c, "DROP TABLE PUBLIC.T IF EXISTS"); + } + + super.afterTest(); + } + + /** {@inheritDoc} */ + @Override protected Connection createStreamedConnection(boolean allowOverwrite, long flushFreq) throws Exception { + return JdbcThinAbstractSelfTest.connect(grid(0), "streaming=true&streamingFlushFrequency=" + + flushFreq + "&" + "streamingAllowOverwrite=" + allowOverwrite + "&streamingPerNodeBufferSize=1000&" + + "streamingBatchSize=" + batchSize); + } + + /** {@inheritDoc} */ + @Override protected Connection createOrdinaryConnection() throws SQLException { + return JdbcThinAbstractSelfTest.connect(grid(0), null); + } + + /** + * @throws Exception if failed. + */ + public void testStreamedBatchedInsert() throws Exception { + for (int i = 10; i <= 100; i += 10) + put(i, nameForId(i * 100)); + + try (Connection conn = createStreamedConnection(false)) { + assertStreamingOn(); + + try (PreparedStatement stmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?), " + + "(?, ?)")) { + for (int i = 1; i <= 100; i+=2) { + stmt.setInt(1, i); + stmt.setString(2, nameForId(i)); + stmt.setInt(3, i + 1); + stmt.setString(4, nameForId(i + 1)); + + stmt.addBatch(); + } + + stmt.executeBatch(); + } + } + + U.sleep(500); + + // Now let's check it's all there. + for (int i = 1; i <= 100; i++) { + if (i % 10 != 0) + assertEquals(nameForId(i), nameForIdInCache(i)); + else // All that divides by 10 evenly should point to numbers 100 times greater - see above + assertEquals(nameForId(i * 100), nameForIdInCache(i)); + } + } + + /** + * @throws SQLException if failed. + */ + public void testSimultaneousStreaming() throws Exception { + try (Connection anotherConn = createOrdinaryConnection()) { + execute(anotherConn, "CREATE TABLE PUBLIC.T(x int primary key, y int) WITH " + + "\"cache_name=T,wrap_value=false\""); + } + + // Timeout to let connection close be handled on server side. + U.sleep(500); + + try (Connection conn = createStreamedConnection(false, 10000)) { + assertStreamingOn(); + + PreparedStatement firstStmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?)"); + + PreparedStatement secondStmt = conn.prepareStatement("insert into PUBLIC.T(x, y) values (?, ?)"); + + try { + for (int i = 1; i <= 10; i++) { + firstStmt.setInt(1, i); + firstStmt.setString(2, nameForId(i)); + + firstStmt.executeUpdate(); + } + + for (int i = 51; i <= 67; i++) { + secondStmt.setInt(1, i); + secondStmt.setInt(2, i); + + secondStmt.executeUpdate(); + } + + for (int i = 11; i <= 50; i++) { + firstStmt.setInt(1, i); + firstStmt.setString(2, nameForId(i)); + + firstStmt.executeUpdate(); + } + + for (int i = 68; i <= 100; i++) { + secondStmt.setInt(1, i); + secondStmt.setInt(2, i); + + secondStmt.executeUpdate(); + } + + assertCacheEmpty(); + + SqlClientContext cliCtx = sqlClientContext(); + + HashMap> streamers = U.field(cliCtx, "streamers"); + + assertEquals(2, streamers.size()); + + assertEqualsCollections(new HashSet<>(Arrays.asList("person", "T")), streamers.keySet()); + } + finally { + U.closeQuiet(firstStmt); + + U.closeQuiet(secondStmt); + } + } + + // Let's wait a little so that all data arrives to destination - we can't intercept streamers' flush + // on connection close in any way. + U.sleep(1000); + + // Now let's check it's all there. + for (int i = 1; i <= 50; i++) + assertEquals(nameForId(i), nameForIdInCache(i)); + + for (int i = 51; i <= 100; i++) + assertEquals(i, grid(0).cache("T").get(i)); + } + + /** + * + */ + public void testStreamingWithMixedStatementTypes() throws Exception { + String prepStmtStr = "insert into Person(\"id\", \"name\") values (?, ?)"; + + String stmtStr = "insert into Person(\"id\", \"name\") values (%d, '%s')"; + + try (Connection conn = createStreamedConnection(false, 10000)) { + assertStreamingOn(); + + PreparedStatement firstStmt = conn.prepareStatement(prepStmtStr); + + Statement secondStmt = conn.createStatement(); + + try { + for (int i = 1; i <= 100; i++) { + boolean usePrep = Math.random() > 0.5; + + boolean useBatch = Math.random() > 0.5; + + if (usePrep) { + firstStmt.setInt(1, i); + firstStmt.setString(2, nameForId(i)); + + if (useBatch) + firstStmt.addBatch(); + else + firstStmt.execute(); + } + else { + String sql = String.format(stmtStr, i, nameForId(i)); + + if (useBatch) + secondStmt.addBatch(sql); + else + secondStmt.execute(sql); + } + } + } + finally { + U.closeQuiet(firstStmt); + + U.closeQuiet(secondStmt); + } + } + + // Let's wait a little so that all data arrives to destination - we can't intercept streamers' flush + // on connection close in any way. + U.sleep(1000); + + // Now let's check it's all there. + for (int i = 1; i <= 100; i++) + assertEquals(nameForId(i), nameForIdInCache(i)); + } + + /** + * Check that there's nothing in cache. + */ + private void assertCacheEmpty() { + assertEquals(0, grid(0).cache(DEFAULT_CACHE_NAME).size(CachePeekMode.ALL)); + } + + /** + * @param conn Connection. + * @param sql Statement. + * @throws SQLException if failed. + */ + private static void execute(Connection conn, String sql) throws SQLException { + try (Statement s = conn.createStatement()) { + s.execute(sql); + } + } + + /** + * @return Active SQL client context. + */ + private SqlClientContext sqlClientContext() { + Set ctxs = U.field(grid(0).context().query(), "cliCtxs"); + + assertFalse(F.isEmpty(ctxs)); + + assertEquals(1, ctxs.size()); + + return ctxs.iterator().next(); + } + + /** + * Check that streaming state on target node is as expected. + */ + private void assertStreamingOn() { + SqlClientContext cliCtx = sqlClientContext(); + + assertTrue(cliCtx.isStream()); + } + + /** {@inheritDoc} */ + @Override protected void assertStatementForbidden(String sql) { + batchSize = 1; + + super.assertStatementForbidden(sql); + } +} \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java (revision ) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java (revision ) @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import java.util.HashMap; +import java.util.Map; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Container for connection properties passed by various drivers (JDBC drivers, possibly ODBC) having notion of an + * SQL connection - Ignite basically does not have one.

+ * Also contains anything that a driver may need to share between threads processing queries of logically same client - + * see JDBC thin driver + */ +public class SqlClientContext implements AutoCloseable { + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Distributed joins flag. */ + private final boolean distributedJoins; + + /** Enforce join order flag. */ + private final boolean enforceJoinOrder; + + /** Collocated flag. */ + private final boolean collocated; + + /** Lazy query execution flag. */ + private final boolean lazy; + + /** Skip reducer on update flag. */ + private final boolean skipReducerOnUpdate; + + /** Allow overwrites for duplicate keys on streamed {@code INSERT}s. */ + private final boolean streamAllowOverwrite; + + /** Parallel ops count per node for data streamer. */ + private final int streamNodeParOps; + + /** Node buffer size for data streamer. */ + private final int streamNodeBufSize; + + /** Auto flush frequency for streaming. */ + private final long streamFlushTimeout; + + /** Streamers for various caches. */ + private final Map> streamers; + + /** Logger. */ + private final IgniteLogger log; + + /** + * @param ctx Kernal context. + * @param distributedJoins Distributed joins flag. + * @param enforceJoinOrder Enforce join order flag. + * @param collocated Collocated flag. + * @param lazy Lazy query execution flag. + * @param skipReducerOnUpdate Skip reducer on update flag. + * @param stream Streaming state flag + * @param streamAllowOverwrite Allow overwrites for duplicate keys on streamed {@code INSERT}s. + * @param streamNodeParOps Parallel ops count per node for data streamer. + * @param streamNodeBufSize Node buffer size for data streamer. + * @param streamFlushTimeout Auto flush frequency for streaming. + */ + public SqlClientContext(GridKernalContext ctx, boolean distributedJoins, boolean enforceJoinOrder, + boolean collocated, boolean lazy, boolean skipReducerOnUpdate, boolean stream, boolean streamAllowOverwrite, + int streamNodeParOps, int streamNodeBufSize, long streamFlushTimeout) { + this.ctx = ctx; + this.distributedJoins = distributedJoins; + this.enforceJoinOrder = enforceJoinOrder; + this.collocated = collocated; + this.lazy = lazy; + this.skipReducerOnUpdate = skipReducerOnUpdate; + this.streamAllowOverwrite = streamAllowOverwrite; + this.streamNodeParOps = streamNodeParOps; + this.streamNodeBufSize = streamNodeBufSize; + this.streamFlushTimeout = streamFlushTimeout; + + streamers = stream ? new HashMap<>() : null; + + log = ctx.log(SqlClientContext.class.getName()); + + ctx.query().registerClientContext(this); + } + + /** + * @return Collocated flag. + */ + public boolean isCollocated() { + return collocated; + } + + /** + * @return Distributed joins flag. + */ + public boolean isDistributedJoins() { + return distributedJoins; + } + + /** + * @return Enforce join order flag. + */ + public boolean isEnforceJoinOrder() { + return enforceJoinOrder; + } + + /** + * @return Lazy query execution flag. + */ + public boolean isLazy() { + return lazy; + } + + /** + * @return Skip reducer on update flag, + */ + public boolean isSkipReducerOnUpdate() { + return skipReducerOnUpdate; + } + + /** + * @return Streaming state flag (on or off). + */ + public boolean isStream() { + return streamers != null; + } + + /** + * @param cacheName Cache name. + * @return Streamer for given cache. + */ + public IgniteDataStreamer streamerForCache(String cacheName) { + Map> curStreamers = streamers; + + if (curStreamers == null) + return null; + + IgniteDataStreamer res = curStreamers.get(cacheName); + + if (res != null) + return res; + + res = ctx.grid().dataStreamer(cacheName); + + IgniteDataStreamer exStreamer = curStreamers.putIfAbsent(cacheName, res); + + if (exStreamer == null) { + res.autoFlushFrequency(streamFlushTimeout); + + res.allowOverwrite(streamAllowOverwrite); + + if (streamNodeBufSize > 0) + res.perNodeBufferSize(streamNodeBufSize); + + if (streamNodeParOps > 0) + res.perNodeParallelOperations(streamNodeParOps); + + return res; + } + else { // Someone got ahead of us. + res.close(); + + return exStreamer; + } + } + + /** {@inheritDoc} */ + @Override public void close() throws Exception { + ctx.query().unregisterClientContext(this); + + if (streamers == null) + return; + + for (IgniteDataStreamer s : streamers.values()) + U.close(s, log); + } +} Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java (date 1519034013000) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java (revision ) @@ -505,7 +505,7 @@ /** * @return Local subquery flag. */ - @Nullable public boolean isLocalSubquery() { + public boolean isLocalSubquery() { return isLocSubqry; } } Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java (date 1519034013000) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java (revision ) @@ -1992,6 +1992,18 @@ expression.getClass().getSimpleName() + ']'); } + /** + * Check if passed statement is insert statement eligible for streaming. + * + * @param nativeStmt Native statement. + * @return {@code True} if streamable insert. + */ + public static boolean isStreamableInsertStatement(PreparedStatement nativeStmt) { + Prepared prep = prepared(nativeStmt); + + return prep instanceof Insert && INSERT_QUERY.get((Insert)prep) == null; + } + /** * @param cond Condition. * @param o Object. Index: modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java (date 1519034013000) +++ modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java (revision ) @@ -65,6 +65,7 @@ import org.apache.ignite.jdbc.thin.JdbcThinSchemaCaseTest; import org.apache.ignite.jdbc.thin.JdbcThinSelectAfterAlterTable; import org.apache.ignite.jdbc.thin.JdbcThinStatementSelfTest; +import org.apache.ignite.jdbc.thin.JdbcThinStreamingSelfTest; import org.apache.ignite.jdbc.thin.JdbcThinUpdateStatementSelfTest; import org.apache.ignite.jdbc.thin.JdbcThinComplexDmlDdlSkipReducerOnUpdateSelfTest; import org.apache.ignite.jdbc.thin.JdbcThinInsertStatementSkipReducerOnUpdateSelfTest; @@ -121,9 +122,11 @@ suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcStatementBatchingSelfTest.class)); suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcErrorsSelfTest.class)); suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcStreamingToPublicCacheTest.class)); + suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcNoCacheStreamingSelfTest.class)); suite.addTest(new TestSuite(JdbcBlobTest.class)); suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest.class)); + suite.addTest(new TestSuite(JdbcThinStreamingSelfTest.class)); // DDL tests. suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDynamicIndexAtomicPartitionedNearSelfTest.class)); Index: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java (date 1519034013000) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java (revision ) @@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.query.QueryField; import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl; +import org.apache.ignite.internal.processors.query.SqlClientContext; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.lang.GridCloseableIterator; @@ -244,12 +245,18 @@ /** {@inheritDoc} */ @Override public List>> querySqlFields(String schemaName, SqlFieldsQuery qry, - boolean keepBinary, boolean failOnMultipleStmts, GridQueryCancel cancel) { + SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, GridQueryCancel cancel) { return null; } /** {@inheritDoc} */ - @Override public long streamUpdateQuery(String spaceName, String qry, @Nullable Object[] params, + @Override public List streamBatchedUpdateQuery(String schemaName, String qry, List params, + SqlClientContext cliCtx) throws IgniteCheckedException { + return Collections.emptyList(); + } + + /** {@inheritDoc} */ + @Override public long streamUpdateQuery(String schemaName, String qry, @Nullable Object[] params, IgniteDataStreamer streamer) throws IgniteCheckedException { return 0; } @@ -372,8 +379,8 @@ } /** {@inheritDoc} */ - @Override public boolean isInsertStatement(PreparedStatement nativeStmt) { - return false; + @Override public void checkStatementStreamable(PreparedStatement nativeStmt) { + // No-op. } /** {@inheritDoc} */ Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java (date 1519034013000) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java (revision ) @@ -92,6 +92,7 @@ import org.apache.ignite.internal.processors.query.QueryField; import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl; import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.processors.query.SqlClientContext; import org.apache.ignite.internal.processors.query.h2.database.H2RowFactory; import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex; import org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasInnerIO; @@ -100,6 +101,7 @@ import org.apache.ignite.internal.processors.query.h2.database.io.H2LeafIO; import org.apache.ignite.internal.processors.query.h2.ddl.DdlStatementsProcessor; import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils; +import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan; import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine; import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase; import org.apache.ignite.internal.processors.query.h2.opt.GridH2PlainRowFactory; @@ -149,7 +151,6 @@ import org.h2.api.ErrorCode; import org.h2.api.JavaObjectSerializer; import org.h2.command.Prepared; -import org.h2.command.dml.Insert; import org.h2.command.dml.NoOperation; import org.h2.engine.Session; import org.h2.engine.SysProperties; @@ -191,7 +192,7 @@ @SuppressWarnings({"UnnecessaryFullyQualifiedName", "NonFinalStaticVariableUsedInClassInitialization"}) public class IgniteH2Indexing implements GridQueryIndexing { public static final Pattern INTERNAL_CMD_RE = Pattern.compile( - "^(create|drop)\\s+index|^alter\\s+table|^copy", Pattern.CASE_INSENSITIVE); + "^(create|drop)\\s+index|^alter\\s+table|^copy|^set|^flush", Pattern.CASE_INSENSITIVE); /* * Register IO for indexes. @@ -500,10 +501,10 @@ } /** {@inheritDoc} */ - @Override public PreparedStatement prepareNativeStatement(String schemaName, String sql) throws SQLException { + @Override public PreparedStatement prepareNativeStatement(String schemaName, String sql) { Connection conn = connectionForSchema(schemaName); - return prepareStatement(conn, sql, true); + return prepareStatementAndCaches(conn, sql); } /** @@ -1013,7 +1014,60 @@ throw new IgniteSQLException(e); } - return dmlProc.streamUpdateQuery(streamer, stmt, params); + return dmlProc.streamUpdateQuery(schemaName, streamer, stmt, params); + } + + /** {@inheritDoc} */ + @SuppressWarnings("ForLoopReplaceableByForEach") + @Override public List streamBatchedUpdateQuery(String schemaName, String qry, List params, + SqlClientContext cliCtx) throws IgniteCheckedException { + if (cliCtx == null || !cliCtx.isStream()) { + U.warn(log, "Connection is not in streaming mode."); + + return zeroBatchedStreamedUpdateResult(params.size()); + } + + final Connection conn = connectionForSchema(schemaName); + + final PreparedStatement stmt = prepareStatementAndCaches(conn, qry); + + if (GridSqlQueryParser.checkMultipleStatements(stmt)) + throw new IgniteSQLException("Multiple statements queries are not supported for streaming mode.", + IgniteQueryErrorCode.UNSUPPORTED_OPERATION); + + checkStatementStreamable(stmt); + + Prepared p = GridSqlQueryParser.prepared(stmt); + + UpdatePlan plan = dmlProc.getPlanForStatement(schemaName, conn, p, null, true, null); + + IgniteDataStreamer streamer = cliCtx.streamerForCache(plan.cacheContext().name()); + + if (streamer != null) { + List res = new ArrayList<>(params.size()); + + for (int i = 0; i < params.size(); i++) + res.add(dmlProc.streamUpdateQuery(schemaName, streamer, stmt, params.get(i))); + + return res; + } + else { + U.warn(log, "Streaming has been turned off by concurrent command."); + + return zeroBatchedStreamedUpdateResult(params.size()); + } + } + + /** + * @param size Result size. + * @return List of given size filled with 0Ls. + */ + private static List zeroBatchedStreamedUpdateResult(int size) { + Long[] res = new Long[size]; + + Arrays.fill(res, 0); + + return Arrays.asList(res); } /** @@ -1399,7 +1453,7 @@ fqry.setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS); final QueryCursor> res = - querySqlFields(schemaName, fqry, keepBinary, true, null).get(0); + querySqlFields(schemaName, fqry, null, keepBinary, true, null).get(0); final Iterable> converted = new Iterable>() { @Override public Iterator> iterator() { @@ -1435,19 +1489,19 @@ * Try executing query using native facilities. * * @param schemaName Schema name. - * @param qry Query. + * @param sql Query. * @return Result or {@code null} if cannot parse/process this query. */ - private List>> tryQueryDistributedSqlFieldsNative(String schemaName, SqlFieldsQuery qry) { + private List>> tryQueryDistributedSqlFieldsNative(String schemaName, String sql) { // Heuristic check for fast return. - if (!INTERNAL_CMD_RE.matcher(qry.getSql().trim()).find()) + if (!INTERNAL_CMD_RE.matcher(sql.trim()).find()) return null; // Parse. SqlCommand cmd; try { - SqlParser parser = new SqlParser(schemaName, qry.getSql()); + SqlParser parser = new SqlParser(schemaName, sql); cmd = parser.nextCommand(); @@ -1455,15 +1509,20 @@ if (parser.nextCommand() != null) return null; - // Currently supported commands are: CREATE/DROP INDEX/COPY/ALTER TABLE + // Currently supported commands are: + // CREATE/DROP INDEX + // COPY + // ALTER TABLE + // SET STREAMING + // FLUSH STREAMER if (!(cmd instanceof SqlCreateIndexCommand || cmd instanceof SqlDropIndexCommand || - cmd instanceof SqlBulkLoadCommand || cmd instanceof SqlAlterTableCommand)) + cmd instanceof SqlAlterTableCommand || cmd instanceof SqlBulkLoadCommand)) return null; } catch (Exception e) { // Cannot parse, return. if (log.isDebugEnabled()) - log.debug("Failed to parse SQL with native parser [qry=" + qry.getSql() + ", err=" + e + ']'); + log.debug("Failed to parse SQL with native parser [qry=" + sql + ", err=" + e + ']'); if (!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK)) return null; @@ -1473,24 +1532,24 @@ if (e instanceof SqlParseException) code = ((SqlParseException)e).code(); - throw new IgniteSQLException("Failed to parse DDL statement: " + qry.getSql() + ": " + e.getMessage(), + throw new IgniteSQLException("Failed to parse DDL statement: " + sql + ": " + e.getMessage(), code, e); } // Execute. if (cmd instanceof SqlBulkLoadCommand) { - FieldsQueryCursor> cursor = dmlProc.runNativeDmlStatement(qry.getSql(), cmd); + FieldsQueryCursor> cursor = dmlProc.runNativeDmlStatement(sql, cmd); return Collections.singletonList(cursor); } else { try { - FieldsQueryCursor> cursor = ddlProc.runDdlStatement(qry.getSql(), cmd); + FieldsQueryCursor> cursor = ddlProc.runDdlStatement(sql, cmd); return Collections.singletonList(cursor); } catch (IgniteCheckedException e) { - throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + qry.getSql() + "]: " + throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + sql + "]: " + e.getMessage(), e); } } @@ -1514,8 +1573,8 @@ /** {@inheritDoc} */ @SuppressWarnings("StringEquality") @Override public List>> querySqlFields(String schemaName, SqlFieldsQuery qry, - boolean keepBinary, boolean failOnMultipleStmts, GridQueryCancel cancel) { - List>> res = tryQueryDistributedSqlFieldsNative(schemaName, qry); + SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, GridQueryCancel cancel) { + List>> res = tryQueryDistributedSqlFieldsNative(schemaName, qry.getSql()); if (res != null) return res; @@ -1553,8 +1612,8 @@ // We may use this cached statement only for local queries and non queries. if (qry.isLocal() || !prepared.isQuery()) - return (List>>)doRunPrepared(schemaName, prepared, qry, null, null, - keepBinary, cancel); + return (List>>)doRunPrepared(schemaName, prepared, qry, null, cliCtx, + null, keepBinary, cancel); } } @@ -1584,7 +1643,7 @@ firstArg += prepared.getParameters().size(); - res.addAll(doRunPrepared(schemaName, prepared, newQry, twoStepQry, meta, keepBinary, cancel)); + res.addAll(doRunPrepared(schemaName, prepared, newQry, twoStepQry, cliCtx, meta, keepBinary, cancel)); if (parseRes.twoStepQuery() != null && parseRes.twoStepQueryKey() != null && !parseRes.twoStepQuery().explain()) @@ -1600,14 +1659,14 @@ * @param prepared H2 command. * @param qry Fields query with flags. * @param twoStepQry Two-step query if this query must be executed in a distributed way. + * @param cliCtx Client context, or {@code null} if not applicable. * @param meta Metadata for {@code twoStepQry}. * @param keepBinary Whether binary objects must not be deserialized automatically. - * @param cancel Query cancel state holder. - * @return Query result. + * @param cancel Query cancel state holder. @return Query result. */ private List>> doRunPrepared(String schemaName, Prepared prepared, - SqlFieldsQuery qry, GridCacheTwoStepQuery twoStepQry, List meta, boolean keepBinary, - GridQueryCancel cancel) { + SqlFieldsQuery qry, GridCacheTwoStepQuery twoStepQry, @Nullable SqlClientContext cliCtx, + List meta, boolean keepBinary, GridQueryCancel cancel) { String sqlQry = qry.getSql(); boolean loc = qry.isLocal(); @@ -2276,10 +2335,10 @@ } /** {@inheritDoc} */ - @Override public boolean isInsertStatement(PreparedStatement nativeStmt) { - Prepared prep = GridSqlQueryParser.prepared(nativeStmt); - - return prep instanceof Insert; + @Override public void checkStatementStreamable(PreparedStatement nativeStmt) { + if (!GridSqlQueryParser.isStreamableInsertStatement(nativeStmt)) + throw new IgniteSQLException("Streaming mode supports only INSERT commands without subqueries.", + IgniteQueryErrorCode.UNSUPPORTED_OPERATION); } /** {@inheritDoc} */ Index: modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcNoCacheStreamingSelfTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcNoCacheStreamingSelfTest.java (revision ) +++ modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcNoCacheStreamingSelfTest.java (revision ) @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.jdbc2; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.util.Collections; +import java.util.Properties; +import org.apache.ignite.IgniteJdbcDriver; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.ConnectorConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.IgniteJdbcDriver.CFG_URL_PREFIX; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Data streaming test for thick driver and no explicit caches. + */ +public class JdbcNoCacheStreamingSelfTest extends GridCommonAbstractTest { + /** JDBC URL. */ + private static final String BASE_URL = CFG_URL_PREFIX + + "cache=default@modules/clients/src/test/config/jdbc-config.xml"; + + /** Connection. */ + protected Connection conn; + + /** */ + protected transient IgniteLogger log; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + return getConfiguration0(gridName); + } + + /** + * @param gridName Grid name. + * @return Grid configuration used for starting the grid. + * @throws Exception If failed. + */ + private IgniteConfiguration getConfiguration0(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration cache = defaultCacheConfiguration(); + + cache.setCacheMode(PARTITIONED); + cache.setBackups(1); + cache.setWriteSynchronizationMode(FULL_SYNC); + cache.setIndexedTypes( + Integer.class, Integer.class + ); + + cfg.setCacheConfiguration(cache); + cfg.setLocalHost("127.0.0.1"); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + ipFinder.setAddresses(Collections.singleton("127.0.0.1:47500..47501")); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + cfg.setConnectorConfiguration(new ConnectorConfiguration()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrids(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @param allowOverwrite Allow overwriting of existing keys. + * @return Connection to use for the test. + * @throws Exception if failed. + */ + protected Connection createConnection(boolean allowOverwrite) throws Exception { + Properties props = new Properties(); + + props.setProperty(IgniteJdbcDriver.PROP_STREAMING, "true"); + props.setProperty(IgniteJdbcDriver.PROP_STREAMING_FLUSH_FREQ, "500"); + + if (allowOverwrite) + props.setProperty(IgniteJdbcDriver.PROP_STREAMING_ALLOW_OVERWRITE, "true"); + + return DriverManager.getConnection(BASE_URL, props); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + U.closeQuiet(conn); + + ignite(0).cache(DEFAULT_CACHE_NAME).clear(); + + super.afterTest(); + } + + /** + * @throws Exception if failed. + */ + public void testStreamedInsert() throws Exception { + for (int i = 10; i <= 100; i += 10) + ignite(0).cache(DEFAULT_CACHE_NAME).put(i, i * 100); + + try (Connection conn = createConnection(false)) { + try (PreparedStatement stmt = conn.prepareStatement("insert into Integer(_key, _val) values (?, ?)")) { + for (int i = 1; i <= 100; i++) { + stmt.setInt(1, i); + stmt.setInt(2, i); + + stmt.executeUpdate(); + } + } + } + + U.sleep(500); + + // Now let's check it's all there. + for (int i = 1; i <= 100; i++) { + if (i % 10 != 0) + assertEquals(i, grid(0).cache(DEFAULT_CACHE_NAME).get(i)); + else // All that divides by 10 evenly should point to numbers 100 times greater - see above + assertEquals(i * 100, grid(0).cache(DEFAULT_CACHE_NAME).get(i)); + } + } + + /** + * @throws Exception if failed. + */ + public void testStreamedInsertWithOverwritesAllowed() throws Exception { + for (int i = 10; i <= 100; i += 10) + ignite(0).cache(DEFAULT_CACHE_NAME).put(i, i * 100); + + try (Connection conn = createConnection(true)) { + try (PreparedStatement stmt = conn.prepareStatement("insert into Integer(_key, _val) values (?, ?)")) { + for (int i = 1; i <= 100; i++) { + stmt.setInt(1, i); + stmt.setInt(2, i); + + stmt.executeUpdate(); + } + } + } + + U.sleep(500); + + // Now let's check it's all there. + // i should point to i at all times as we've turned overwrites on above. + for (int i = 1; i <= 100; i++) + assertEquals(i, grid(0).cache(DEFAULT_CACHE_NAME).get(i)); + } +} Index: modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java (date 1519034013000) +++ modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java (revision ) @@ -24,6 +24,8 @@ import java.util.Properties; import javax.naming.RefAddr; import javax.naming.Reference; + +import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.configuration.ClientConnectorConfiguration; import org.apache.ignite.internal.processors.odbc.SqlStateCode; import org.apache.ignite.internal.util.typedef.F; @@ -146,15 +148,45 @@ private StringProperty sslFactory = new StringProperty("sslFactory", "Custom class name that implements Factory", null, null, false, null); + /** Turn on streaming mode on this connection. */ + private BooleanProperty stream = new BooleanProperty( + "streaming", "Turn on streaming mode on this connection", false, false); + + /** Turn on overwrite during streaming on this connection. */ + private BooleanProperty streamAllowOverwrite = new BooleanProperty( + "streamingAllowOverwrite", "Turn on overwrite during streaming on this connection", false, false); + + /** Number of parallel operations per cluster node during streaming. */ + private IntegerProperty streamParOps = new IntegerProperty( + "streamingPerNodeParallelOperations", "Number of parallel operations per cluster node during streaming", + 0, false, 0, Integer.MAX_VALUE); + + /** Buffer size per cluster node during streaming. */ + private IntegerProperty streamBufSize = new IntegerProperty( + "streamingPerNodeBufferSize", "Buffer size per cluster node during streaming", + 0, false, 0, Integer.MAX_VALUE); + + /** Server-size flush frequency during streaming. */ + private LongProperty streamFlushFreq = new LongProperty( + "streamingFlushFrequency", "Server-size flush frequency during streaming", + 0, false, 0, Long.MAX_VALUE); + + /** Buffer size per cluster node during streaming. */ + private IntegerProperty streamBatchSize = new IntegerProperty( + "streamingBatchSize", "Batch size for streaming (number of commands to accumulate internally " + + "before actually sending over the wire)", IgniteDataStreamer.DFLT_PER_NODE_BUFFER_SIZE * 4, false, 1, + Integer.MAX_VALUE); + /** Properties array. */ - private final ConnectionProperty [] propsArray = { + private final ConnectionProperty [] props = { host, port, distributedJoins, enforceJoinOrder, collocated, replicatedOnly, autoCloseServerCursor, tcpNoDelay, lazy, socketSendBuffer, socketReceiveBuffer, skipReducerOnUpdate, sslMode, sslProtocol, sslKeyAlgorithm, sslClientCertificateKeyStoreUrl, sslClientCertificateKeyStorePassword, sslClientCertificateKeyStoreType, sslTrustCertificateKeyStoreUrl, sslTrustCertificateKeyStorePassword, sslTrustCertificateKeyStoreType, - sslTrustAll, sslFactory + sslTrustAll, sslFactory, + stream, streamAllowOverwrite, streamParOps, streamBufSize, streamFlushFreq, streamBatchSize }; /** {@inheritDoc} */ @@ -387,6 +419,66 @@ this.sslFactory.setValue(sslFactory); } + /** {@inheritDoc} */ + @Override public boolean isStream() { + return stream.value(); + } + + /** {@inheritDoc} */ + @Override public void setStream(boolean val) { + stream.setValue(val); + } + + /** {@inheritDoc} */ + @Override public boolean isStreamAllowOverwrite() { + return streamAllowOverwrite.value(); + } + + /** {@inheritDoc} */ + @Override public void setStreamAllowOverwrite(boolean val) { + streamAllowOverwrite.setValue(val); + } + + /** {@inheritDoc} */ + @Override public int getStreamParallelOperations() { + return streamParOps.value(); + } + + /** {@inheritDoc} */ + @Override public void setStreamParallelOperations(int val) throws SQLException { + streamParOps.setValue(val); + } + + /** {@inheritDoc} */ + @Override public int getStreamBufferSize() { + return streamBufSize.value(); + } + + /** {@inheritDoc} */ + @Override public void setStreamBufferSize(int val) throws SQLException { + streamBufSize.setValue(val); + } + + /** {@inheritDoc} */ + @Override public long getStreamFlushFrequency() { + return streamFlushFreq.value(); + } + + /** {@inheritDoc} */ + @Override public void setStreamFlushFrequency(long val) throws SQLException { + streamFlushFreq.setValue(val); + } + + /** {@inheritDoc} */ + @Override public int getStreamBatchSize() { + return streamBatchSize.value(); + } + + /** {@inheritDoc} */ + @Override public void setStreamBatchSize(int val) throws SQLException { + streamBatchSize.setValue(val); + } + /** * @param props Environment properties. * @throws SQLException On error. @@ -394,7 +486,7 @@ void init(Properties props) throws SQLException { Properties props0 = (Properties)props.clone(); - for (ConnectionProperty aPropsArray : propsArray) + for (ConnectionProperty aPropsArray : this.props) aPropsArray.init(props0); } @@ -402,10 +494,10 @@ * @return Driver's properties info array. */ private DriverPropertyInfo[] getDriverPropertyInfo() { - DriverPropertyInfo[] dpis = new DriverPropertyInfo[propsArray.length]; + DriverPropertyInfo[] dpis = new DriverPropertyInfo[props.length]; - for (int i = 0; i < propsArray.length; ++i) - dpis[i] = propsArray[i].getDriverPropertyInfo(); + for (int i = 0; i < props.length; ++i) + dpis[i] = props[i].getDriverPropertyInfo(); return dpis; } @@ -722,7 +814,8 @@ else { try { setValue(parse(str)); - } catch (NumberFormatException e) { + } + catch (NumberFormatException e) { throw new SQLException("Failed to parse int property [name=" + name + ", value=" + str + ']', SqlStateCode.CLIENT_CONNECTION_FAILED); } @@ -794,6 +887,38 @@ } } + /** + * + */ + private static class LongProperty extends NumberProperty { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param name Name. + * @param desc Description. + * @param dfltVal Default value. + * @param required {@code true} if the property is required. + * @param min Lower bound of allowed range. + * @param max Upper bound of allowed range. + */ + LongProperty(String name, String desc, Number dfltVal, boolean required, long min, long max) { + super(name, desc, dfltVal, required, min, max); + } + + /** {@inheritDoc} */ + @Override protected Number parse(String str) throws NumberFormatException { + return Long.parseLong(str); + } + + /** + * @return Property value. + */ + long value() { + return val.longValue(); + } + } + /** * */ Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java (date 1519034013000) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java (revision ) @@ -385,6 +385,7 @@ /** * Perform given statement against given data streamer. Only rows based INSERT is supported. * + * @param schemaName Schema name. * @param streamer Streamer to feed data to. * @param stmt Statement. * @param args Statement arguments. @@ -392,81 +393,74 @@ * @throws IgniteCheckedException if failed. */ @SuppressWarnings({"unchecked", "ConstantConditions"}) - long streamUpdateQuery(IgniteDataStreamer streamer, PreparedStatement stmt, final Object[] args) + long streamUpdateQuery(String schemaName, IgniteDataStreamer streamer, PreparedStatement stmt, final Object[] args) throws IgniteCheckedException { + idx.checkStatementStreamable(stmt); + Prepared p = GridSqlQueryParser.prepared(stmt); assert p != null; - final UpdatePlan plan = UpdatePlanBuilder.planForStatement(p, true, idx, null, null, null); + final UpdatePlan plan = getPlanForStatement(schemaName, null, p, null, true, null); - if (!F.eq(streamer.cacheName(), plan.cacheContext().name())) - throw new IgniteSQLException("Cross cache streaming is not supported, please specify cache explicitly" + - " in connection options", IgniteQueryErrorCode.UNSUPPORTED_OPERATION); - - if (plan.mode() == UpdateMode.INSERT && plan.rowCount() > 0) { - assert plan.isLocalSubquery(); + assert plan.isLocalSubquery(); - final GridCacheContext cctx = plan.cacheContext(); + final GridCacheContext cctx = plan.cacheContext(); - QueryCursorImpl> cur; + QueryCursorImpl> cur; - final ArrayList> data = new ArrayList<>(plan.rowCount()); + final ArrayList> data = new ArrayList<>(plan.rowCount()); - QueryCursorImpl> stepCur = new QueryCursorImpl<>(new Iterable>() { - @Override public Iterator> iterator() { - try { - Iterator> it; + QueryCursorImpl> stepCur = new QueryCursorImpl<>(new Iterable>() { + @Override public Iterator> iterator() { + try { + Iterator> it; - if (!F.isEmpty(plan.selectQuery())) { - GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()), - plan.selectQuery(), F.asList(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)), - null, false, 0, null); + if (!F.isEmpty(plan.selectQuery())) { + GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()), + plan.selectQuery(), F.asList(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)), + null, false, 0, null); - it = res.iterator(); - } - else - it = plan.createRows(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)).iterator(); + it = res.iterator(); + } + else + it = plan.createRows(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)).iterator(); - return new GridQueryCacheObjectsIterator(it, idx.objectContext(), cctx.keepBinary()); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - }, null); + return new GridQueryCacheObjectsIterator(it, idx.objectContext(), cctx.keepBinary()); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }, null); - data.addAll(stepCur.getAll()); + data.addAll(stepCur.getAll()); - cur = new QueryCursorImpl<>(new Iterable>() { - @Override public Iterator> iterator() { - return data.iterator(); - } - }, null); + cur = new QueryCursorImpl<>(new Iterable>() { + @Override public Iterator> iterator() { + return data.iterator(); + } + }, null); - if (plan.rowCount() == 1) { - IgniteBiTuple t = plan.processRow(cur.iterator().next()); + if (plan.rowCount() == 1) { + IgniteBiTuple t = plan.processRow(cur.iterator().next()); - streamer.addData(t.getKey(), t.getValue()); + streamer.addData(t.getKey(), t.getValue()); - return 1; - } + return 1; + } - Map rows = new LinkedHashMap<>(plan.rowCount()); + Map rows = new LinkedHashMap<>(plan.rowCount()); - for (List row : cur) { - final IgniteBiTuple t = plan.processRow(row); + for (List row : cur) { + final IgniteBiTuple t = plan.processRow(row); - rows.put(t.getKey(), t.getValue()); - } + rows.put(t.getKey(), t.getValue()); + } - streamer.addData(rows); + streamer.addData(rows); - return rows.size(); - } - else - throw new IgniteSQLException("Only tuple based INSERT statements are supported in streaming mode", - IgniteQueryErrorCode.UNSUPPORTED_OPERATION); + return rows.size(); } /** @@ -519,7 +513,7 @@ .setPageSize(fieldsQry.getPageSize()) .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS); - cur = (QueryCursorImpl>)idx.querySqlFields(schemaName, newFieldsQry, true, true, + cur = (QueryCursorImpl>)idx.querySqlFields(schemaName, newFieldsQry, null, true, true, cancel).get(0); } else if (plan.hasRows()) @@ -610,7 +604,7 @@ * @return Update plan. */ @SuppressWarnings({"unchecked", "ConstantConditions"}) - private UpdatePlan getPlanForStatement(String schema, Connection conn, Prepared p, SqlFieldsQuery fieldsQry, + UpdatePlan getPlanForStatement(String schema, Connection conn, Prepared p, SqlFieldsQuery fieldsQry, boolean loc, @Nullable Integer errKeysPos) throws IgniteCheckedException { H2CachedStatementKey planKey = H2CachedStatementKey.forDmlStatement(schema, p.getSQL(), fieldsQry, loc); Index: modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java (date 1519034013000) +++ modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java (revision ) @@ -262,13 +262,19 @@ @Override public void addBatch() throws SQLException { ensureNotClosed(); - if (batch == null) { - batch = new ArrayList<>(); + batchSize++; + + if (conn.isStream()) + conn.addBatch(sql, args); + else { + if (batch == null) { + batch = new ArrayList<>(); - batch.add(new JdbcQuery(sql, args.toArray(new Object[args.size()]))); - } - else - batch.add(new JdbcQuery(null, args.toArray(new Object[args.size()]))); + batch.add(new JdbcQuery(sql, args.toArray(new Object[args.size()]))); + } + else + batch.add(new JdbcQuery(null, args.toArray(new Object[args.size()]))); + } args = null; } Index: modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java (date 1519034013000) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java (revision ) @@ -76,33 +76,46 @@ * Detect whether SQL query should be executed in distributed or local manner and execute it. * @param schemaName Schema name. * @param qry Query. + * @param cliCtx Client context. * @param keepBinary Keep binary flag. * @param failOnMultipleStmts Whether an exception should be thrown for multiple statements query. - * @param cancel Query cancel state handler. - * @return Cursor. + * @param cancel Query cancel state handler. @return Cursor. */ - public List>> querySqlFields(String schemaName, SqlFieldsQuery qry, boolean keepBinary, - boolean failOnMultipleStmts, GridQueryCancel cancel); + public List>> querySqlFields(String schemaName, SqlFieldsQuery qry, + SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, GridQueryCancel cancel); /** - * Perform a MERGE statement using data streamer as receiver. + * Execute an INSERT statement using data streamer as receiver. * * @param schemaName Schema name. * @param qry Query. * @param params Query parameters. * @param streamer Data streamer to feed data to. - * @return Query result. + * @return Update counter. * @throws IgniteCheckedException If failed. */ public long streamUpdateQuery(String schemaName, String qry, @Nullable Object[] params, IgniteDataStreamer streamer) throws IgniteCheckedException; + /** + * Execute a batched INSERT statement using data streamer as receiver. + * + * @param schemaName Schema name. + * @param qry Query. + * @param params Query parameters. + * @param cliCtx Client connection context. + * @return Update counters. + * @throws IgniteCheckedException If failed. + */ + public List streamBatchedUpdateQuery(String schemaName, String qry, List params, + SqlClientContext cliCtx) throws IgniteCheckedException; + /** * Executes regular query. * * @param schemaName Schema name. * @param cacheName Cache name. - *@param qry Query. + * @param qry Query. * @param filter Cache name and key filter. * @param keepBinary Keep binary flag. @return Cursor. */ @@ -313,12 +326,11 @@ public String schema(String cacheName); /** - * Check if passed statement is insert statemtn. + * Check if passed statement is insert statement eligible for streaming, throw an {@link IgniteSQLException} if not. * * @param nativeStmt Native statement. - * @return {@code True} if insert. */ - public boolean isInsertStatement(PreparedStatement nativeStmt); + public void checkStatementStreamable(PreparedStatement nativeStmt); /** * Return row cache cleaner. Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java (date 1519034013000) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java (revision ) @@ -629,7 +629,7 @@ boolean keepBinary = opCtxCall != null && opCtxCall.isKeepBinary(); - return ctx.kernalContext().query().querySqlFields(ctx, qry, keepBinary, false); + return ctx.kernalContext().query().querySqlFields(ctx, qry, null, keepBinary, false); } catch (Exception e) { if (e instanceof CacheException) @@ -662,7 +662,7 @@ if (qry instanceof SqlFieldsQuery) return (FieldsQueryCursor)ctx.kernalContext().query().querySqlFields(ctx, (SqlFieldsQuery)qry, - keepBinary, true).get(0); + null, keepBinary, true).get(0); if (qry instanceof ScanQuery) return query((ScanQuery)qry, null, projection(qry.isLocal())); Index: modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java (date 1519034013000) +++ modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java (revision ) @@ -20,16 +20,24 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; import java.util.Collections; import java.util.Properties; +import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteJdbcDriver; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.binary.BinaryObjectBuilder; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.ConnectorConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static org.apache.ignite.IgniteJdbcDriver.CFG_URL_PREFIX; @@ -41,10 +49,12 @@ */ public class JdbcStreamingSelfTest extends GridCommonAbstractTest { /** JDBC URL. */ - private static final String BASE_URL = CFG_URL_PREFIX + "cache=default@modules/clients/src/test/config/jdbc-config.xml"; + private static final String BASE_URL = CFG_URL_PREFIX + + "cache=default@modules/clients/src/test/config/jdbc-config.xml"; - /** Connection. */ - protected Connection conn; + /** Streaming URL. */ + private static final String STREAMING_URL = CFG_URL_PREFIX + + "cache=person@modules/clients/src/test/config/jdbc-config.xml"; /** */ protected transient IgniteLogger log; @@ -90,7 +100,18 @@ /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + startGrids(2); + + try (Connection c = createOrdinaryConnection()) { + try (Statement s = c.createStatement()) { + s.execute("CREATE TABLE PUBLIC.Person(\"id\" int primary key, \"name\" varchar) WITH " + + "\"cache_name=person,value_type=Person\""); + } + } + + U.sleep(1000); } /** {@inheritDoc} */ @@ -98,28 +119,52 @@ stopAllGrids(); } + /** + * @return Connection without streaming initially turned on. + * @throws SQLException if failed. + */ + protected Connection createOrdinaryConnection() throws SQLException { + Connection res = DriverManager.getConnection(BASE_URL, new Properties()); + + res.setSchema(QueryUtils.DFLT_SCHEMA); + + return res; + } + /** * @param allowOverwrite Allow overwriting of existing keys. * @return Connection to use for the test. * @throws Exception if failed. */ - private Connection createConnection(boolean allowOverwrite) throws Exception { + protected Connection createStreamedConnection(boolean allowOverwrite) throws Exception { + return createStreamedConnection(allowOverwrite, 500); + } + + /** + * @param allowOverwrite Allow overwriting of existing keys. + * @param flushTimeout Stream flush timeout. + * @return Connection to use for the test. + * @throws Exception if failed. + */ + protected Connection createStreamedConnection(boolean allowOverwrite, long flushTimeout) throws Exception { Properties props = new Properties(); props.setProperty(IgniteJdbcDriver.PROP_STREAMING, "true"); - props.setProperty(IgniteJdbcDriver.PROP_STREAMING_FLUSH_FREQ, "500"); + props.setProperty(IgniteJdbcDriver.PROP_STREAMING_FLUSH_FREQ, String.valueOf(flushTimeout)); if (allowOverwrite) props.setProperty(IgniteJdbcDriver.PROP_STREAMING_ALLOW_OVERWRITE, "true"); - return DriverManager.getConnection(BASE_URL, props); + Connection res = DriverManager.getConnection(STREAMING_URL, props); + + res.setSchema(QueryUtils.DFLT_SCHEMA); + + return res; } /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { - U.closeQuiet(conn); - - ignite(0).cache(DEFAULT_CACHE_NAME).clear(); + cache().clear(); super.afterTest(); } @@ -128,30 +173,59 @@ * @throws Exception if failed. */ public void testStreamedInsert() throws Exception { - conn = createConnection(false); - for (int i = 10; i <= 100; i += 10) - ignite(0).cache(DEFAULT_CACHE_NAME).put(i, i * 100); + put(i, nameForId(i * 100)); - PreparedStatement stmt = conn.prepareStatement("insert into Integer(_key, _val) values (?, ?)"); - - for (int i = 1; i <= 100; i++) { - stmt.setInt(1, i); - stmt.setInt(2, i); + try (Connection conn = createStreamedConnection(false)) { + try (PreparedStatement stmt = conn.prepareStatement("insert into PUBLIC.Person(\"id\", \"name\") " + + "values (?, ?)")) { + for (int i = 1; i <= 100; i++) { + stmt.setInt(1, i); + stmt.setString(2, nameForId(i)); - stmt.executeUpdate(); + stmt.executeUpdate(); + } + } } - // Closing connection makes it wait for streamer close - // and thus for data load completion as well - conn.close(); + U.sleep(500); // Now let's check it's all there. for (int i = 1; i <= 100; i++) { if (i % 10 != 0) - assertEquals(i, grid(0).cache(DEFAULT_CACHE_NAME).get(i)); + assertEquals(nameForId(i), nameForIdInCache(i)); else // All that divides by 10 evenly should point to numbers 100 times greater - see above - assertEquals(i * 100, grid(0).cache(DEFAULT_CACHE_NAME).get(i)); + assertEquals(nameForId(i * 100), nameForIdInCache(i)); + } + } + + /** + * @throws Exception if failed. + */ + public void testStreamedInsertWithoutColumnsList() throws Exception { + for (int i = 10; i <= 100; i += 10) + put(i, nameForId(i * 100)); + + try (Connection conn = createStreamedConnection(false)) { + try (PreparedStatement stmt = conn.prepareStatement("insert into PUBLIC.Person(\"id\", \"name\") " + + "values (?, ?)")) { + for (int i = 1; i <= 100; i++) { + stmt.setInt(1, i); + stmt.setString(2, nameForId(i)); + + stmt.executeUpdate(); + } + } + } + + U.sleep(500); + + // Now let's check it's all there. + for (int i = 1; i <= 100; i++) { + if (i % 10 != 0) + assertEquals(nameForId(i), nameForIdInCache(i)); + else // All that divides by 10 evenly should point to numbers 100 times greater - see above + assertEquals(nameForId(i * 100), nameForIdInCache(i)); } } @@ -159,27 +233,99 @@ * @throws Exception if failed. */ public void testStreamedInsertWithOverwritesAllowed() throws Exception { - conn = createConnection(true); - for (int i = 10; i <= 100; i += 10) - ignite(0).cache(DEFAULT_CACHE_NAME).put(i, i * 100); + put(i, nameForId(i * 100)); - PreparedStatement stmt = conn.prepareStatement("insert into Integer(_key, _val) values (?, ?)"); - - for (int i = 1; i <= 100; i++) { - stmt.setInt(1, i); - stmt.setInt(2, i); + try (Connection conn = createStreamedConnection(true)) { + try (PreparedStatement stmt = conn.prepareStatement("insert into PUBLIC.Person(\"id\", \"name\") " + + "values (?, ?)")) { + for (int i = 1; i <= 100; i++) { + stmt.setInt(1, i); + stmt.setString(2, nameForId(i)); - stmt.executeUpdate(); + stmt.executeUpdate(); + } + } } - // Closing connection makes it wait for streamer close - // and thus for data load completion as well - conn.close(); + U.sleep(500); // Now let's check it's all there. // i should point to i at all times as we've turned overwrites on above. for (int i = 1; i <= 100; i++) - assertEquals(i, grid(0).cache(DEFAULT_CACHE_NAME).get(i)); + assertEquals(nameForId(i), nameForIdInCache(i)); + } + + /** */ + public void testOnlyInsertsAllowed() { + assertStatementForbidden("CREATE TABLE PUBLIC.X (x int primary key, y int)"); + + assertStatementForbidden("SELECT * from Person"); + + assertStatementForbidden("insert into PUBLIC.Person(\"id\", \"name\") " + + "(select \"id\" + 1, CONCAT(\"name\", '1') from Person)"); + + assertStatementForbidden("DELETE from Person"); + + assertStatementForbidden("UPDATE Person SET \"name\" = 'name0'"); + + assertStatementForbidden("alter table Person add column y int"); + } + + /** + * @param sql Statement to check. + */ + @SuppressWarnings("ThrowableNotThrown") + protected void assertStatementForbidden(String sql) { + GridTestUtils.assertThrows(null, new IgniteCallable() { + @Override public Object call() throws Exception { + try (Connection c = createStreamedConnection(false)) { + try (PreparedStatement s = c.prepareStatement(sql)) { + s.execute(); + } + } + + return null; + } + }, SQLException.class,"Streaming mode supports only INSERT commands without subqueries."); + } + + /** + * @return Person cache. + */ + protected IgniteCache cache() { + return grid(0).cache("person"); + } + + /** + * @param id id of person to put. + * @param name name of person to put. + */ + protected void put(int id, String name) { + BinaryObjectBuilder bldr = grid(0).binary().builder("Person"); + + bldr.setField("name", name); + + cache().put(id, bldr.build()); + } + + /** + * @param id Person id. + * @return Default name for person w/given id. + */ + protected String nameForId(int id) { + return "Person" + id; + } + + /** + * @param id person id. + * @return Name for person with given id currently stored in cache. + */ + protected String nameForIdInCache(int id) { + Object o = cache().withKeepBinary().get(id); + + assertTrue(String.valueOf(o), o instanceof BinaryObject); + + return ((BinaryObject)o).field("name"); } } Index: modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java (date 1519034013000) +++ modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java (revision ) @@ -79,6 +79,9 @@ /** Result set holdability*/ private final int resHoldability; + /** Batch size to keep track of number of items to return as fake update counters for executeBatch. */ + protected int batchSize; + /** Batch. */ protected List batch; @@ -133,6 +136,19 @@ if (sql == null || sql.isEmpty()) throw new SQLException("SQL query is empty."); + if (conn.isStream()) { + if (stmtType == JdbcStatementType.SELECT_STATEMENT_TYPE) + throw new SQLException("executeQuery() method is not allowed in streaming mode.", + SqlStateCode.INTERNAL_ERROR, + IgniteQueryErrorCode.UNSUPPORTED_OPERATION); + + conn.addBatch(sql, args); + + resultSets = Collections.singletonList(resultSetForUpdate(0)); + + return; + } + JdbcResult res0 = conn.sendRequest(new JdbcQueryExecuteRequest(stmtType, schema, pageSize, maxRows, sql, args == null ? null : args.toArray(new Object[args.size()]))); @@ -158,11 +174,8 @@ boolean firstRes = true; for(JdbcResultInfo rsInfo : resInfos) { - if (!rsInfo.isQuery()) { - resultSets.add(new JdbcThinResultSet(this, -1, pageSize, - true, Collections.>emptyList(), false, - conn.autoCloseServerCursor(), rsInfo.updateCount(), closeOnCompletion)); - } + if (!rsInfo.isQuery()) + resultSets.add(resultSetForUpdate(rsInfo.updateCount())); else { if (firstRes) { firstRes = false; @@ -185,6 +198,16 @@ assert resultSets.size() > 0 : "At least one results set is expected"; } + /** + * @param cnt Update counter. + * @return Result set for given update counter. + */ + private JdbcThinResultSet resultSetForUpdate(long cnt) { + return new JdbcThinResultSet(this, -1, pageSize, + true, Collections.>emptyList(), false, + conn.autoCloseServerCursor(), cnt, closeOnCompletion); + } + /** * Sends a file to server in batches via multiple {@link JdbcBulkLoadBatchRequest}s. * @@ -469,6 +492,14 @@ @Override public void addBatch(String sql) throws SQLException { ensureNotClosed(); + batchSize++; + + if (conn.isStream()) { + conn.addBatch(sql, null); + + return; + } + if (batch == null) batch = new ArrayList<>(); @@ -479,6 +510,8 @@ @Override public void clearBatch() throws SQLException { ensureNotClosed(); + batchSize = 0; + batch = null; } @@ -488,6 +521,14 @@ closeResults(); + if (conn.isStream()) { + int[] res = new int[batchSize]; + + batchSize = 0; + + return res; + } + if (batch == null || batch.isEmpty()) throw new SQLException("Batch is empty."); @@ -502,6 +543,8 @@ return res.updateCounts(); } finally { + batchSize = 0; + batch = null; } } Index: modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java (date 1519034013000) +++ modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java (revision ) @@ -62,8 +62,11 @@ /** Version 2.4.0. */ private static final ClientListenerProtocolVersion VER_2_4_0 = ClientListenerProtocolVersion.create(2, 4, 0); + /** Version 2.5.0. */ + private static final ClientListenerProtocolVersion VER_2_5_0 = ClientListenerProtocolVersion.create(2, 5, 0); + /** Current version. */ - private static final ClientListenerProtocolVersion CURRENT_VER = VER_2_4_0; + private static final ClientListenerProtocolVersion CURRENT_VER = VER_2_5_0; /** Initial output stream capacity for handshake. */ private static final int HANDSHAKE_MSG_SIZE = 13; @@ -265,6 +268,11 @@ writer.writeBoolean(connProps.isAutoCloseServerCursor()); writer.writeBoolean(connProps.isLazy()); writer.writeBoolean(connProps.isSkipReducerOnUpdate()); + writer.writeBoolean(connProps.isStream()); + writer.writeBoolean(connProps.isStreamAllowOverwrite()); + writer.writeInt(connProps.getStreamParallelOperations()); + writer.writeInt(connProps.getStreamBufferSize()); + writer.writeLong(connProps.getStreamFlushFrequency()); send(writer.array()); @@ -298,7 +306,8 @@ ClientListenerProtocolVersion srvProtocolVer = ClientListenerProtocolVersion.create(maj, min, maintenance); - if (VER_2_3_0.equals(srvProtocolVer) || VER_2_1_5.equals(srvProtocolVer)) + if (VER_2_4_0.equals(srvProtocolVer) || VER_2_3_0.equals(srvProtocolVer) || + VER_2_1_5.equals(srvProtocolVer)) handshake(srvProtocolVer); else if (VER_2_1_0.equals(srvProtocolVer)) handshake_2_1_0(); Index: modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java (date 1519034013000) +++ modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java (revision ) @@ -299,7 +299,7 @@ call.call(); } catch (Throwable e) { - if (cls != e.getClass()) { + if (cls != e.getClass() && !cls.isAssignableFrom(e.getClass())) { if (e.getClass() == CacheException.class && e.getCause() != null && e.getCause().getClass() == cls) e = e.getCause(); else { Index: modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java (date 1519034013000) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java (revision ) @@ -28,7 +28,7 @@ import org.apache.ignite.internal.util.GridSpinBusyLock; /** - * ODBC Connection Context. + * JDBC Connection Context. */ public class JdbcConnectionContext implements ClientListenerConnectionContext { /** Version 2.1.0. */ @@ -38,13 +38,16 @@ private static final ClientListenerProtocolVersion VER_2_1_5 = ClientListenerProtocolVersion.create(2, 1, 5); /** Version 2.3.1: added "multiple statements query" feature. */ - public static final ClientListenerProtocolVersion VER_2_3_0 = ClientListenerProtocolVersion.create(2, 3, 0); + static final ClientListenerProtocolVersion VER_2_3_0 = ClientListenerProtocolVersion.create(2, 3, 0); /** Version 2.4.0: adds default values for columns feature. */ - public static final ClientListenerProtocolVersion VER_2_4_0 = ClientListenerProtocolVersion.create(2, 4, 0); + static final ClientListenerProtocolVersion VER_2_4_0 = ClientListenerProtocolVersion.create(2, 4, 0); + + /** Version 2.5.0: adds streaming via thin connection. */ + static final ClientListenerProtocolVersion VER_2_5_0 = ClientListenerProtocolVersion.create(2, 5, 0); /** Current version. */ - private static final ClientListenerProtocolVersion CURRENT_VER = VER_2_4_0; + private static final ClientListenerProtocolVersion CURRENT_VER = VER_2_5_0; /** Supported versions. */ private static final Set SUPPORTED_VERS = new HashSet<>(); @@ -66,6 +69,7 @@ static { SUPPORTED_VERS.add(CURRENT_VER); + SUPPORTED_VERS.add(VER_2_4_0); SUPPORTED_VERS.add(VER_2_3_0); SUPPORTED_VERS.add(VER_2_1_5); SUPPORTED_VERS.add(VER_2_1_0); @@ -113,8 +117,23 @@ if (ver.compareTo(VER_2_3_0) >= 0) skipReducerOnUpdate = reader.readBoolean(); + boolean stream = false; + boolean streamAllowOverwrites = false; + int streamParOps = 0; + int streamBufSize = 0; + long streamFlushFreq = 0; + + if (ver.compareTo(VER_2_5_0) >= 0) { + stream = reader.readBoolean(); + streamAllowOverwrites = reader.readBoolean(); + streamParOps = reader.readInt(); + streamBufSize = reader.readInt(); + streamFlushFreq = reader.readLong(); + } + handler = new JdbcRequestHandler(ctx, busyLock, maxCursors, distributedJoins, enforceJoinOrder, - collocated, replicatedOnly, autoCloseCursors, lazyExec, skipReducerOnUpdate, ver); + collocated, replicatedOnly, autoCloseCursors, lazyExec, skipReducerOnUpdate, stream, streamAllowOverwrites, + streamParOps, streamBufSize, streamFlushFreq, ver); parser = new JdbcMessageParser(ctx); } Index: modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java (date 1519034013000) +++ modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java (revision ) @@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.odbc.SqlStateCode; import org.apache.ignite.internal.processors.query.GridQueryIndexing; +import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.resource.GridSpringResourceContext; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -612,10 +613,11 @@ PreparedStatement nativeStmt = prepareNativeStatement(sql); - if (!idx.isInsertStatement(nativeStmt)) { - throw new SQLException("Only INSERT operations are supported in streaming mode", - SqlStateCode.INTERNAL_ERROR, - IgniteQueryErrorCode.UNSUPPORTED_OPERATION); + try { + idx.checkStatementStreamable(nativeStmt); + } + catch (IgniteSQLException e) { + throw e.toJdbcException(); } IgniteDataStreamer streamer = ignite().dataStreamer(cacheName); Index: modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java (date 1519034013000) +++ modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java (revision ) @@ -351,4 +351,70 @@ * @param sslFactory Custom class name that implements Factory<SSLSocketFactory>. */ public void setSslFactory(String sslFactory); + + /** + * @return Streamed connection flag. + */ + public boolean isStream(); + + /** + * @param stream Streamed connection flag. + */ + public void setStream(boolean stream); + + /** + * @return Allow overwrites during streaming connection flag. + */ + public boolean isStreamAllowOverwrite(); + + /** + * @param streamAllowOverwrite Allow overwrites during streaming connection flag. + */ + public void setStreamAllowOverwrite(boolean streamAllowOverwrite); + + /** + * @return Number of parallel operations per node during streaming connection param. + */ + public int getStreamParallelOperations(); + + /** + * @param streamParallelOperations Number of parallel operations per node during streaming connection param. + * @throws SQLException if value check failed. + */ + public void setStreamParallelOperations(int streamParallelOperations) throws SQLException; + + /** + * @return Buffer size during streaming connection param. + */ + public int getStreamBufferSize(); + + /** + * @param streamBufSize Buffer size during streaming connection param. + * @throws SQLException if value check failed. + */ + public void setStreamBufferSize(int streamBufSize) throws SQLException; + + /** + * @return Flush timeout during streaming connection param. + */ + public long getStreamFlushFrequency(); + + /** + * @param streamFlushFreq Flush timeout during streaming connection param. + * @throws SQLException if value check failed. + */ + public void setStreamFlushFrequency(long streamFlushFreq) throws SQLException; + + /** + * @return Batch size for streaming (number of commands to accumulate internally before actually + * sending over the wire). + */ + public int getStreamBatchSize(); + + /** + * @param streamBatchSize Batch size for streaming (number of commands to accumulate internally before actually + * sending over the wire). + * @throws SQLException if value check failed. + */ + public void setStreamBatchSize(int streamBatchSize) throws SQLException; }