Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
4.6.0
-
None
-
None
Description
I'll add more details later but here's the scenario that consistently produces wrong row count for index table vs base table for immutable async index.
1. Start data upsert
2. Create async index
3. Trigger M/R index build
4. Keep data upsert going in background during step 2,3 and a while after M/R index finishes.
5. End data upsert.
Now count with index enabled vs count with hint to not use index is off by a large factor. Will get a cleaner repro for this issue soon.
Attachments
Attachments
- output_reads.log
- 8 kB
- Thomas D'Silva
- output_writes.log
- 254 kB
- Thomas D'Silva
- output.log
- 414 kB
- Thomas D'Silva
- PHOENIX-2446_partial.patch
- 8 kB
- James R. Taylor
- PHOENIX-2446_testfix.patch
- 12 kB
- James R. Taylor
- PHOENIX-2446.patch
- 24 kB
- Thomas D'Silva
- PHOENIX-2446-v4.patch
- 71 kB
- Thomas D'Silva
- PHOENIX-2446-v5.patch
- 82 kB
- Thomas D'Silva
- PHOENIX-2446-v6.patch
- 87 kB
- Thomas D'Silva
- PHOENIX-2446-wip.patch
- 45 kB
- Thomas D'Silva
- PHOENIX-2446-wip-v3.patch
- 113 kB
- Thomas D'Silva
- server.log
- 39 kB
- Thomas D'Silva
Issue Links
- is related to
-
PHOENIX-2582 Prevent need of catch up query when creating non transactional index
- Open
Activity
Can you identify the rows in the base table that are missing and see if there's a pattern for the cell time stamps? For example, are they the rows with time stamps nearest to the time stamp of the index rows? Do all the cell time stamps of the index rows for the initial populate have the same time stamp? They should match the time stamp at which the index was created.
For one reproducible case involving transactions, see IndexIT.testCreateIndexAfterUpsertStarted() and the associated FIXME comments for PHOENIX-2446.
The case that is failing is when a commit starts before an index exists, but commits after the index build is completed. For transactional data, this is problematic because the index gets a timestamp after the commit of the data table mutation and thus these mutations won't be seen during the commit. Also, when the index is being built, the data hasn't yet been committed and thus won't be part of the initial index build.
This doesn't shed any light on the non transactional case where this is occurring, though.
Different idea than the sleep: what if we ran the upsert/select at a lower priority than normal? That way the other work would naturally finish first? Second idea is if we did a manual flush after the create index, but before the upsert/select. Would you mind trying those ideas?
Was thinking on my above idea a bit, and I don't think it'll help for all cases. The trickiest one is when a big UPSERT SELECT is running when a CREATE INDEX occurs. All of the rows being upserted will be timestamped back in time (at the timestamp at which the data was read). We need to do that so that the scans reading the data won't see the data being written (and get into an infinite loop when reading/writing to the same table).
The sleep we're doing may help some simple cases, like an UPSERT VALUES occurring at the same time as the CREATE INDEX, but it's unlikely to help for the above case. One way to deal with this would be for the client to detect that a CREATE INDEX occurred while the UPSERT SELECT is running and start to do incremental maintenance on the rows being upserted. We'd need to update the metadata cache using LATEST_TIMESTAMP at which point the index would be found and incremental index maintenance would start.
One consideration is if a DELETE is being executed at the same as time as the UPSERT SELECT ( I think this would be an issue even outside of this overlapping create index/mutation case). In theory, mutable indexes should handle this as they handle out-of-order updates. For the immutable case, we could
- ignore it and document it as deletes over immutable data is more of a test-environment type of feature. The one case where it's not is DROP of a view, but this case could be handled by detecting this when we update the metadata cache (as we'd no longer find the view).
- tell the server that these indexes are mutable and let the out-of-order logic handle this case. Unless we always treat these indexes as mutable, we can't handle the DELETE at the same time as an UPSERT case 100% correctly, so we'd lose the perf benefit of indexes over immutable data.
Based on this, I think (1) is a better option with clear docs on the interplay between DELETE and UPSERT. To handle it this way, we'd need to:
- Issue the updateCache call we do in MutationState.validate() at the LATEST_TIMESTAMP if the table is not transactional. This will handle all the mutation cases: UPSERT VALUES, UPSERT SELECT, and DELETE, essentially initiating incremental index maintenance before the index has been created to ensure that we don't miss any rows. Worst case, we'll be issuing duplicate mutations (but that's better than no issuing enough).
- If upon updating the metadata cache we do not find the table any longer and the table is immutable, then the mutation should not be performed (the logic being that a DROP was performed after the mutation started).
- None of this logic will be necessary for transactional tables as we have a different mechanism that relies on the transaction manager to detect these cases for us (see
PHOENIX-2478).
Make sense, tdsilva?
If we create an index while a big UPSERT SELECT is running, wouldn't the scn of of the index population UPSERT SELECT be after the timestamp of the large UPSERT SELECT rows being written, so the initial index population would write these rows ?
Yes, the scn of the index population would be after the ts of the UPSERT SELECT, but the UPSERT SELECT may still be running when the CREATE INDEX is issued. That's the root of the problem, though - we may not have all the table rows that are earlier than the initial index population ts. The fix I propose will essentially turn on incremental index population for in-flight statements that may be in the process of adding data table rows when the CREATE INDEX statement is issued.
One more small change will be necessary that I didn't mention before too. When auto commit is on, we call updateCache for UPSERT VALUES from FromCompiler here:
protected TableRef createTableRef(NamedTableNode tableNode, boolean updateCacheImmediately) throws SQLException { String tableName = tableNode.getName().getTableName(); String schemaName = tableNode.getName().getSchemaName(); long timeStamp = QueryConstants.UNSET_TIMESTAMP; String fullTableName = SchemaUtil.getTableName(schemaName, tableName); PName tenantId = connection.getTenantId(); PTable theTable = null; if (updateCacheImmediately || connection.getAutoCommit()) { MetaDataMutationResult result = client.updateCache(schemaName, tableName);
Instead, we should always let the updateCache call be done in MutationState.validate(), so the following if statement should be removed:
private long validate(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState> rowKeyToColumnMap) throws SQLException { Long scn = connection.getSCN(); MetaDataClient client = new MetaDataClient(connection); long serverTimeStamp = tableRef.getTimeStamp(); // If we're auto committing, we've already validated the schema when we got the ColumnResolver, // so no need to do it again here. if (!connection.getAutoCommit()) {
I have attached a wip patch that does incremental index maintenance on rows being upserted. I tested this fix manually creating an index while rows were being upserted but it does not fix the original row count issue.
I will see if running the upsert/select at a lower priority than normal or doing a manual flush after the create index fixes the issue.
Thanks,
Thomas
Thanks, tdsilva. Have you figured out why it didn't solve the issue?
I think the original row count issue happens when an index is created while a large batch of rows is being written. When the initial index population UPSERT SELECT is run, it won't be able to see some rows of this batch. I cannot reproduce the issue for smaller batch sizes.
The increment index maintenance fix also doesn't help since the batch was already sent to the server before the index was created.
If we run the index population UPSERT SELECT at a lower priority, would the number of region server handlers matter? If we have more handlers than requests then would the UPSERT SELECT run while the in progress batch is being written on the region server?
To test manually flushing should I call flush() on the data table using HBaseAdmin ?
Thanks,
Thomas
I tried calling flush using HBaseAdmin after creating the index table and before running the UPSERT SELECT and this fixes the issue when testing manually.
Flush writes the data from the memstore to disk, however if the data is already present in the memstore then the UPSERT SELECT would be able to see it, so I am not sure if adding the flush would fix the issue in all cases. Is adding a flush() equivalent to the sleep ?
I'm not sure of the exact processing that occurs during a flush, but logically it writes the data that's in the memstore to disk. How long does it take and if you sleep for that amount of time (instead of doing the flush), do your tests pass?
Have you tried running the UPSERT/SELECT at a lower priority?
I suppose we need to understand the timing of everything to understand why it's failing. Is the data inflight when the index population occurs runs? Or does the data arrive at the server while the UPSERT/SELECT is running?
Would be good to make a timeline like this:
- Compile CREATE INDEX IDX ON T statement
- Resolve table T getting server timestamp back of t0.
- Execute CREATE INDEX statement
- Create HBase metadata for new IDX table (index on view or local index may not create any new metadata)
- Insert Phoenix metadata for new index
- Execute UPSERT SELECT to populate index using t0 for scan and put timestamp
- Execute n scans in parallel chunk-by-chunk for each guidepost and submit batch mutation for initial index population.
- Mark new index as active (i.e. updating Phoenix metadata through coprocessor call)
The above is time ordered, so the UPSERT SELECT is running as of an earlier timestamp. When does other client's batch of mutations have to hit the server for the problem to occur?
I have attached a log file which contains logging from HRegionServer and HRegion. I used pherf to load data and created an index while the data was being loaded. After the data load was completed there were 500,000 rows in the data table, but only 495990 rows in then index table (4010 rows were missing). The CREATE INDEX statement returned 0 rows created.
I added a log line to HRegion.doMiniBatchMutation() after the call to mvcc.completeMemstoreInsert(w) which advances mvcc and makes the batch visible to scanners.
I also added a log line to HRegionServer.scan() when the server first gets the scan to print out the max timestamp of the scan. I added another log line just before the server returns the results to the client to get the number of rows returned to the client.
I only included logs for rows written that were are not part of the incremental index maintenance (all of these rows should have been picked up by the UPSERT SELECT) and for scans that were part of the UPSERT SELECT.
From the log, all the scans complete before the call to advance the mvcc in HRegion and so the scans all return 0 rows.
If the UPSERT SELECT runs when there are rows being processed in HRegion before the call to advance the mvcc then the UPSERT SELECT won't be able to see these rows.
The total number of rows written is 8020 , which is twice the number of missing rows (not sure why its double).
That's good information. So how can we guarantee that the mvcc gets advanced before the UPSERT SELECT runs? Would a flush do that or would it not see the data yet either potentially because the mvcc hasn't been advanced? Is a sleep our only option, and if so would a very minimal sleep (1-5 seconds) do the trick? Of course there's always stuff that can delay the advance of the mvcc.
lhofhansl - any advice for us here? FWIW, we've take a different approach for transactional tables, leveraging the transaction manager where we can detect this situation reliably.
I don't think a flush will guarantee that pending batches will be put in the memstore, I think it just flushes whatever is in the memstore. When I tested out using a flush before running the UPSERT SELECT depending on how long the flush takes the UPSERT SELECT is able to see the rows. If I initiate the index build after a few batch of rows (<5000) are sent, the flush completes quickly and the UPSERT SELECT is not able to see the rows. If I kick off the create index after ~300000 rows are sent then the flush takes longer and the UPSERT SELECT is able to see the rows.
A one second sleep solves the issue when testing manually.
I can't think of a way we can detect all batches of rows that were sent before the incremental index maintenance is enabled is in the memstore before we run the index population UPSERT SELECT.
Chatted with James a bit. So to recap, the problem is the HBase MVCC transactions in flight before the index created; those would be missed since they did not exist when the index was created and are also not yet seen by a more or less parallel upsert select statement, right?
A flush won't help.
The only thing that I can see would help is to await at least one MVCC transaction on all region servers. That is annoying, but doable.
i.e. calling mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); that will force all prior MVCC transactions - if any - to finish and then return.
mvcc get be retrieved from the region interface with the getMVCC() method.
(As an aside, I'd add that upsert select won't scale to the kind of data size where HBase/Phoenix would actually be interesting. For less than maybe a few 100m rows, one should use Postgres or equivalent.)
Thanks, lhofhansl. That's exactly what we were looking for.
We can't predict when an index will be added to a table, so we'd like it to work reliably. Some potential scenarios:
- A user may have a smallish table (100m rows or less) and decide to add an index at that point.
- A user may be adding an index to a view which isn't yet a multi-billion row table (perhaps the physical table is, though).
- A user may have orthogonal smaller tables being managed by Phoenix because their bigger ones are there too (it's not easy to use a RDBMS and a Hadoop-based store seamlessly).
tdsilva - please try calling MetaDataClient.waitForPreviousTransactionsComplete(PTable physicalDataTable) instead of the sleep. This uses the method that Lars recommended for ensuring that in flight batches are processed.
tdsilva - I attached a fix for the transactional case of this issue over on PHOENIX-2478. I'm also attaching a modified version of your test with a couple of minor tweaks:
- create the index with an IF NOT EXISTS clause, as the coprocessor will fire again when we replay the transaction and we'd get an error if it already existed
- throw a DoNotRetryIOException from the coprocessor as you don't want to trigger the 30x retry loop of HBase
- lower the number of rows - you don't need to create many rows to repro given the way you've setup the test
- query the table with no hint (as it'll use the index). The old code would have been fine too, but it didn't include the schema name of the index.
It passes now, but only when transactional is true.
I have attached logs which include the mvcc write pointer for batches being written , and the mvcc read pointer for scans.
Calling mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()) does not fix the issue.
However if I set the hbase.regionserver.handler.count to 1 which means there is one queue and one handler on the region server it fixes the issue. By default there are 3 queues and 30 handlers. I think the scan for the index population was getting picked up before all the data table batched writes were processed. Setting hander count to 1 forces all the batched data table writes that were sent to be processed before the index population scan.
That's good to know, tdsilva, but I don't think we can use that as a fix. Most HBase users will use the default handler count.
One alternative would be is to run a catch up UPSERT SELECT after the index populating UPSERT SELECT is complete. We could restrict the scans to run with a time range. The lower bound of the time range would be the time at which we resolved the data table when we ran the first UPSERT SELECT. And the upper bound would be some configurable delta (100 ms default?)
Yes seems like the best way to fix it for now, discussed this approach with James offline yesterday. I am working on a patch.
Yep, thanks, samarthjain - we like your idea - that's what tdsilva is working on.
Patch looks good, tdsilva. Does it fix the issue? Here's some minor feedback:
- Do we really need the default sleep time to be 30seconds? Would 5 seconds be enough?
- Don't do the catch-up UPSERT SELECT if the table is transactional (as it's not needed).
- I don't think the min time range is being set correctly in MetaDataClient here (shouldn't it be index.getTimeStamp()-delta):
long minTimestamp = index.getTimeStamp()-sleepTime;
- I don't think we need to add the Dependencies stuff in MutationState (unless you really need that for your test verification). We already have a lot of existing tests that verify that the data table and index are consistent after mutations.
- No need to check for the index being a local index in MetaDataClient here, as local indexes are handled in the if branch:
if (index.getIndexType() != IndexType.LOCAL && sleepTime>0) {
- We will want to run the catchup for local indexes (that aren't transactional) too, but you'll need to involve the same code that's in the if statement (not run the UPSERT SELECT). The idea with the PostIndexDDLCompiler is that it abstracts this. Ideally, we could have a PostLocalIndexDDLCompiler and you could just move the existing code into that. If that proves difficult, then just factor out the code in the if statement to a new MetaDataClient method.
Thanks for the feedback, I will make the changes. For setting the min time range of the second upsert select.
delta might be negative if the first upsert select takes longer than the sleep time
Should the min time range be index.getTimeStamp() - ( MAX(sleepTime, firstUpsertSelectDuration) ) ?
I was thinking the second upsert select wouldn't be run if the delta <= 0, but that's probably not quite right. You idea sounds good, but we probably want index.getTimeStamp() - MAX(1,MAX(sleepTime, firstUpsertSelectDuration)), since we'd never want to run it with the same timestamp for min time and max time.
+1. Couple of fix-on-commit nits: remove commented out code in PostLocalIndexDDLCompiler and make sure indenting is correct in MetaDataClient (and other places).
Thanks, tdsilva - nice work. Do your manual tests all pass too?
One other minor issue. Make sure you throw again on an InterruptedException.
+ } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + }
Should look like this instead:
+ } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException(); + }
FAILURE: Integrated in Phoenix-master #1100 (See https://builds.apache.org/job/Phoenix-master/1100/)
PHOENIX-2446 Immutable index - Index vs base table row count does not (tdsilva: rev a138cfe0f3df47091a0d9fe0285a8e572d76b252)
- phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
- phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
- phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
- phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
- phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
- phoenix-protocol/src/main/PTable.proto
- phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
- phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
- phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
- phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
- phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
- phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
- phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
- phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
- phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
- phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
- phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
- phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
- phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
- phoenix-protocol/src/main/MetaDataService.proto
- phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
- phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
- phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
- phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
- phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
- phoenix-core/src/test/java/org/apache/phoenix/expression/ColumnExpressionTest.java
- phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
- phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
- phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
- phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
- phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
Some more observations: