Index: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java (revision a15b45bdfae738145ac25f9e3e9600a85aa607cb) +++ modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java (revision ) @@ -64,6 +64,18 @@ public GridCursor find(L lower, L upper) throws IgniteCheckedException; /** + * Returns a cursor from lower to upper bounds inclusive. + * + * @param lower Lower bound or {@code null} if unbounded. + * @param upper Upper bound or {@code null} if unbounded. + * @param x Implementation specific argument, {@code null} always means that we need to return full detached + * data row. + * @return Cursor. + * @throws IgniteCheckedException If failed. + */ + public GridCursor find(L lower, L upper, Object x) throws IgniteCheckedException; + + /** * Returns a value mapped to the lowest key, or {@code null} if tree is empty * @return Value. * @throws IgniteCheckedException If failed. Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java (revision a15b45bdfae738145ac25f9e3e9600a85aa607cb) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java (revision ) @@ -21,6 +21,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; @@ -29,8 +30,11 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList; import org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasInnerIO; import org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasLeafIO; +import org.apache.ignite.internal.processors.query.h2.database.io.H2RowLinkIO; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2FilteredRow; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter; import org.h2.result.SearchRow; import org.h2.table.IndexColumn; import org.h2.value.Value; @@ -118,8 +122,19 @@ } /** {@inheritDoc} */ - @Override protected GridH2Row getRow(BPlusIO io, long pageAddr, int idx, Object ignore) + @Override protected GridH2Row getRow(BPlusIO io, long pageAddr, int idx, Object filter) throws IgniteCheckedException { + if (filter != null) { + long link = ((H2RowLinkIO)io).getLink(pageAddr, idx); + + int part = PageIdUtils.partId(PageIdUtils.pageId(link)); + + IndexingQueryCacheFilter filter0 = (IndexingQueryCacheFilter)filter; + + if (!filter0.applyPartition(part)) + return new GridH2FilteredRow(part); + } + return (GridH2Row)io.getLookupRow(this, pageAddr, idx); } Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java (revision a15b45bdfae738145ac25f9e3e9600a85aa607cb) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java (revision ) @@ -928,23 +928,12 @@ throw new IllegalStateException("Tree is being concurrently destroyed: " + getName()); } - /** - * @param lower Lower bound inclusive or {@code null} if unbounded. - * @param upper Upper bound inclusive or {@code null} if unbounded. - * @return Cursor. - * @throws IgniteCheckedException If failed. - */ + /** {@inheritDoc} */ @Override public GridCursor find(L lower, L upper) throws IgniteCheckedException { return find(lower, upper, null); } - /** - * @param lower Lower bound inclusive or {@code null} if unbounded. - * @param upper Upper bound inclusive or {@code null} if unbounded. - * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. - * @return Cursor. - * @throws IgniteCheckedException If failed. - */ + /** {@inheritDoc} */ public final GridCursor find(L lower, L upper, Object x) throws IgniteCheckedException { checkDestroyed(); Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java (revision a15b45bdfae738145ac25f9e3e9600a85aa607cb) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java (revision ) @@ -178,7 +178,7 @@ H2Tree tree = treeForRead(seg); - return new H2Cursor(tree.find(lower, upper), p); + return new H2Cursor(tree.find(lower, upper, p)); } catch (IgniteCheckedException e) { throw DbException.convert(e); @@ -282,6 +282,7 @@ H2Tree tree = treeForRead(seg); + // TODO: Do we filter backups here? Looks like a bug! GridH2Row row = b ? tree.findFirst(): tree.findLast(); return new SingleRowCursor(row); @@ -318,19 +319,27 @@ } /** {@inheritDoc} */ - @Override protected GridCursor doFind0( + @Override protected H2Cursor doFind0( IgniteTree t, @Nullable SearchRow first, boolean includeFirst, @Nullable SearchRow last, IndexingQueryFilter filter) { try { - GridCursor range = t.find(first, last); + IndexingQueryCacheFilter p = null; + if (filter != null) { + String cacheName = getTable().cacheName(); + + p = filter.forCache(cacheName); + } + + GridCursor range = t.find(first, last, p); + if (range == null) - return EMPTY_CURSOR; + range = EMPTY_CURSOR; - return filter(range, filter); + return new H2Cursor(range); } catch (IgniteCheckedException e) { throw DbException.convert(e); Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java (revision a15b45bdfae738145ac25f9e3e9600a85aa607cb) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java (revision ) @@ -60,12 +60,7 @@ * @param name Index name. * @param colsList Index columns. */ - public H2PkHashIndex( - GridCacheContext cctx, - GridH2Table tbl, - String name, - List colsList - ) { + public H2PkHashIndex(GridCacheContext cctx, GridH2Table tbl, String name, List colsList) { IndexColumn[] cols = colsList.toArray(new IndexColumn[colsList.size()]); @@ -216,6 +211,7 @@ if (filter == null) return true; + // TODO: Do not materialize here as well! CacheDataRow dataRow = cursor.get(); if (filter.applyPartition(dataRow.partition())) Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasInnerIO.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasInnerIO.java (revision a15b45bdfae738145ac25f9e3e9600a85aa607cb) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasInnerIO.java (revision ) @@ -33,7 +33,7 @@ /** * Inner page for H2 row references. */ -public class H2ExtrasInnerIO extends BPlusInnerIO { +public class H2ExtrasInnerIO extends BPlusInnerIO implements H2RowLinkIO { /** Payload size. */ private final int payloadSize; @@ -129,12 +129,8 @@ PageUtils.putLong(dstPageAddr, dstOff + payloadSize, link); } - /** - * @param pageAddr Page address. - * @param idx Index. - * @return Link to row. - */ - private long getLink(long pageAddr, int idx) { + /** {@inheritDoc} */ + @Override public long getLink(long pageAddr, int idx) { return PageUtils.getLong(pageAddr, offset(idx) + payloadSize); } } Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasLeafIO.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasLeafIO.java (revision a15b45bdfae738145ac25f9e3e9600a85aa607cb) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasLeafIO.java (revision ) @@ -33,7 +33,7 @@ /** * Leaf page for H2 row references. */ -public class H2ExtrasLeafIO extends BPlusLeafIO { +public class H2ExtrasLeafIO extends BPlusLeafIO implements H2RowLinkIO { /** Payload size. */ private final int payloadSize; @@ -126,12 +126,8 @@ return ((H2Tree)tree).getRowFactory().getRow(link); } - /** - * @param pageAddr Page address. - * @param idx Index. - * @return Link to row. - */ - private long getLink(long pageAddr, int idx) { + /** {@inheritDoc} */ + @Override public long getLink(long pageAddr, int idx) { return PageUtils.getLong(pageAddr, offset(idx) + payloadSize); } } Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java (revision a15b45bdfae738145ac25f9e3e9600a85aa607cb) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java (revision ) @@ -28,6 +28,7 @@ import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.query.h2.H2Cursor; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage; @@ -46,7 +47,6 @@ import org.apache.ignite.logger.NullLogger; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.indexing.IndexingQueryFilter; -import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter; import org.h2.engine.Session; import org.h2.index.BaseIndex; import org.h2.index.Cursor; @@ -73,7 +73,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.NoSuchElementException; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Future; @@ -261,17 +260,6 @@ } /** - * Filters rows from expired ones and using predicate. - * - * @param cursor GridCursor over rows. - * @param filter Optional filter. - * @return Filtered iterator. - */ - protected GridCursor filter(GridCursor cursor, IndexingQueryFilter filter) { - return new FilteringCursor(cursor, U.currentTimeMillis(), filter, getTable().cacheName()); - } - - /** * @return Filter for currently running query or {@code null} if none. */ protected static IndexingQueryFilter threadLocalFilter() { @@ -1570,7 +1558,7 @@ * @param filter Filter. * @return Iterator over rows in given range. */ - protected GridCursor doFind0( + protected H2Cursor doFind0( IgniteTree t, @Nullable SearchRow first, boolean includeFirst, @@ -1580,77 +1568,11 @@ } /** - * Cursor which filters by expiration time and predicate. - */ - protected static class FilteringCursor implements GridCursor { - /** */ - private final GridCursor cursor; - - /** */ - private final IndexingQueryCacheFilter fltr; - - /** */ - private final long time; - - /** */ - private GridH2Row next; - - /** - * @param cursor GridCursor. - * @param time Time for expired rows filtering. - * @param qryFilter Filter. - * @param cacheName Cache name. - */ - protected FilteringCursor(GridCursor cursor, long time, IndexingQueryFilter qryFilter, - String cacheName) { - this.cursor = cursor; - this.time = time; - this.fltr = qryFilter != null ? qryFilter.forCache(cacheName) : null; - } - - /** - * @param row Row. - * @return If this row was accepted. - */ - @SuppressWarnings({"unchecked", "SimplifiableIfStatement"}) - protected boolean accept(GridH2Row row) { - if (row.expireTime() != 0 && row.expireTime() <= time) - return false; - - return fltr == null || fltr.applyPartition(row.partition()); - } - - /** {@inheritDoc} */ - @Override public boolean next() throws IgniteCheckedException { - next = null; - - while (cursor.next()) { - GridH2Row t = cursor.get(); - - if (accept(t)) { - next = t; - return true; - } - } - - return false; - } - - /** {@inheritDoc} */ - @Override public GridH2Row get() throws IgniteCheckedException { - if (next == null) - throw new NoSuchElementException(); - - return next; - } - } - - /** * */ private static final class CursorIteratorWrapper implements Iterator { /** */ - private final GridCursor cursor; + private final H2Cursor cursor; /** Next element. */ private GridH2Row next; @@ -1658,19 +1580,14 @@ /** * @param cursor Cursor. */ - private CursorIteratorWrapper(GridCursor cursor) { + private CursorIteratorWrapper(H2Cursor cursor) { assert cursor != null; this.cursor = cursor; - try { - if (cursor.next()) + if (cursor.next()) - next = cursor.get(); + next = (GridH2Row)cursor.get(); - } + } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } /** {@inheritDoc} */ @Override public boolean hasNext() { @@ -1679,19 +1596,14 @@ /** {@inheritDoc} */ @Override public GridH2Row next() { - try { - GridH2Row res = next; + GridH2Row res = next; - if (cursor.next()) + if (cursor.next()) - next = cursor.get(); + next = (GridH2Row)cursor.get(); - else - next = null; + else + next = null; - return res; + return res; - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } } /** {@inheritDoc} */ Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2FilteredRow.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2FilteredRow.java (revision ) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2FilteredRow.java (revision ) @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.opt; + +import org.h2.value.Value; + +/** + * Row which was filtered out. + */ +public class GridH2FilteredRow extends GridH2Row { + /** Partition ID. */ + private final int partId; + + /** + * Constructor. + * + * @param partId Partition. + */ + public GridH2FilteredRow(int partId) { + super(null); + + this.partId = partId; + } + + /** {@inheritDoc} */ + @Override public int partition() { + return partId; + } + + /** {@inheritDoc} */ + @Override public long expireTime() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public int getColumnCount() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public Value getValue(int index) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void setValue(int index, Value v) { + throw new UnsupportedOperationException(); + } +} Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Cursor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Cursor.java (revision a15b45bdfae738145ac25f9e3e9600a85aa607cb) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Cursor.java (revision ) @@ -18,10 +18,10 @@ package org.apache.ignite.internal.processors.query.h2; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2FilteredRow; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter; import org.h2.index.Cursor; import org.h2.message.DbException; import org.h2.result.Row; @@ -35,29 +35,17 @@ private final GridCursor cursor; /** */ - private final IndexingQueryCacheFilter filter; - - /** */ private final long time = U.currentTimeMillis(); /** * @param cursor Cursor. - * @param filter Filter. */ - public H2Cursor(GridCursor cursor, IndexingQueryCacheFilter filter) { + public H2Cursor(GridCursor cursor) { assert cursor != null; this.cursor = cursor; - this.filter = filter; } - /** - * @param cursor Cursor. - */ - public H2Cursor(GridCursor cursor) { - this(cursor, null); - } - /** {@inheritDoc} */ @Override public Row get() { try { @@ -79,11 +67,13 @@ while (cursor.next()) { GridH2Row row = cursor.get(); + if (row instanceof GridH2FilteredRow) + continue; + if (row.expireTime() > 0 && row.expireTime() <= time) continue; - if (filter == null || filter.applyPartition(row.partition())) - return true; + return true; } return false; Index: modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java (revision a15b45bdfae738145ac25f9e3e9600a85aa607cb) +++ modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java (revision ) @@ -32,6 +32,8 @@ import org.apache.ignite.internal.processors.query.h2.H2Cursor; import org.apache.ignite.internal.util.GridCursorIteratorWrapper; import org.apache.ignite.internal.util.lang.GridCursor; +import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter; +import org.apache.ignite.spi.indexing.IndexingQueryFilter; import org.h2.engine.Session; import org.h2.index.Cursor; import org.h2.index.IndexLookupBatch; @@ -322,6 +324,12 @@ if (!i.hasNext()) return EMPTY_CURSOR; + long time = System.currentTimeMillis(); + + IndexingQueryFilter qryFilter = threadLocalFilter(); + + IndexingQueryCacheFilter qryCacheFilter = qryFilter != null ? qryFilter.forCache(getTable().cacheName()) : null; + List rows = new ArrayList<>(); do { @@ -329,11 +337,15 @@ assert row != null; + if (row.expireTime() != 0 && row.expireTime() <= time) + continue; + + if (qryCacheFilter == null || qryCacheFilter.applyPartition(row.partition())) - rows.add(row); + rows.add(row); } while (i.hasNext()); - return filter(new GridCursorIteratorWrapper(rows.iterator()), threadLocalFilter()); + return new GridCursorIteratorWrapper(rows.iterator()); } /** {@inheritDoc} */ \ No newline at end of file