diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java index 5c15675..0c6b9ea 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java @@ -65,6 +65,8 @@ final AcidOutputFormat outf; private Object[] bucketFieldData; // Pre-allocated in constructor. Updated on each write. + private Long curBatchMinTxnId; + private Long curBatchMaxTxnId; protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf) throws ConnectionError, StreamingException { @@ -98,6 +100,12 @@ protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf) } } + /** + * used to tag error msgs to provied some breadcrumbs + */ + String getWatermark() { + return partitionPath + " txnIds[" + curBatchMinTxnId + "," + curBatchMaxTxnId + "]"; + } // return the column numbers of the bucketed columns private List getBucketColIDs(List bucketCols, List cols) { ArrayList result = new ArrayList(bucketCols.size()); @@ -164,22 +172,32 @@ public void newBatch(Long minTxnId, Long maxTxnID) throws StreamingIOFailure, SerializationError { try { LOG.debug("Creating Record updater"); + curBatchMinTxnId = minTxnId; + curBatchMaxTxnId = maxTxnID; updaters = createRecordUpdaters(totalBuckets, minTxnId, maxTxnID); } catch (IOException e) { - LOG.error("Failed creating record updater", e); - throw new StreamingIOFailure("Unable to get new record Updater", e); + String errMsg = "Failed creating RecordUpdaterS for " + getWatermark(); + LOG.error(errMsg, e); + throw new StreamingIOFailure(errMsg, e); } } @Override public void closeBatch() throws StreamingIOFailure { - try { - for (RecordUpdater updater : updaters) { + boolean haveError = false; + for (RecordUpdater updater : updaters) { + try { + //try not to leave any files open updater.close(false); } - updaters.clear(); - } catch (IOException e) { - throw new StreamingIOFailure("Unable to close recordUpdater", e); + catch(Exception ex) { + haveError = true; + LOG.error("Unable to close " + updater + " due to: " + ex.getMessage(), ex); + } + } + updaters.clear(); + if(haveError) { + throw new StreamingIOFailure("Encountered errors while closing (see logs) " + getWatermark()); } } diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java index ffa51c9..03f6a44 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java @@ -29,6 +29,7 @@ public ConnectionError(String msg, Exception innerEx) { } public ConnectionError(HiveEndPoint endPoint, Exception innerEx) { - super("Error connecting to " + endPoint, innerEx); + super("Error connecting to " + endPoint + + (innerEx == null ? "" : ": " + innerEx.getMessage()), innerEx); } } diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java index 4f1154e..394cc54 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java @@ -243,7 +243,7 @@ public void write(long transactionId, byte[] record) } @Override - public SerDe getSerde() throws SerializationError { + public SerDe getSerde() { return serde; } diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index 2f2d44a..4c77842 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -48,6 +48,7 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -503,7 +504,6 @@ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveCo } // class ConnectionImpl - private static class TransactionBatchImpl implements TransactionBatch { private final String username; private final UserGroupInformation ugi; @@ -512,27 +512,28 @@ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveCo private final RecordWriter recordWriter; private final List txnIds; - private int currentTxnIndex; + private int currentTxnIndex = -1; private final String partNameForLock; private TxnState state; private LockRequest lockRequest = null; + /** + * once any operation on this batch encounters a system exception + * (e.g. IOException on write) it's safest to assume that we can't write to the + * file backing this batch any more. This guards important public methods + */ + private volatile boolean isClosed = false; /** * Represents a batch of transactions acquired from MetaStore * - * @param user - * @param ugi - * @param endPt - * @param numTxns - * @param msClient - * @param recordWriter * @throws StreamingException if failed to create new RecordUpdater for batch * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch */ private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEndPoint endPt , final int numTxns, final IMetaStoreClient msClient, RecordWriter recordWriter) throws StreamingException, TransactionBatchUnAvailable, InterruptedException { + boolean success = false; try { if ( endPt.partitionVals!=null && !endPt.partitionVals.isEmpty() ) { Table tableObj = msClient.getTable(endPt.database, endPt.table); @@ -549,15 +550,18 @@ private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEn txnIds = openTxnImpl(msClient, user, numTxns, ugi); - - this.currentTxnIndex = -1; this.state = TxnState.INACTIVE; recordWriter.newBatch(txnIds.get(0), txnIds.get(txnIds.size()-1)); + success = true; } catch (TException e) { throw new TransactionBatchUnAvailable(endPt, e); } catch (IOException e) { throw new TransactionBatchUnAvailable(endPt, e); } + finally { + //clean up if above throws + markDead(success); + } } private List openTxnImpl(final IMetaStoreClient msClient, final String user, final int numTxns, UserGroupInformation ugi) @@ -589,6 +593,7 @@ public String toString() { @Override public void beginNextTransaction() throws TransactionError, ImpersonationFailed, InterruptedException { + checkIsClosed(); if (ugi==null) { beginNextTransactionImpl(); return; @@ -610,10 +615,12 @@ public Void run() throws TransactionError { } private void beginNextTransactionImpl() throws TransactionError { + state = TxnState.INACTIVE;//clear state from previous txn if ( currentTxnIndex >= txnIds.size() ) throw new InvalidTrasactionState("No more transactions available in" + " current batch for end point : " + endPt); ++currentTxnIndex; + state = TxnState.OPEN; lockRequest = createLockRequest(endPt, partNameForLock, username, getCurrentTxnId()); try { LockResponse res = msClient.lock(lockRequest); @@ -623,8 +630,6 @@ private void beginNextTransactionImpl() throws TransactionError { } catch (TException e) { throw new TransactionError("Unable to acquire lock on " + endPt, e); } - - state = TxnState.OPEN; } /** @@ -640,7 +645,7 @@ public Long getCurrentTxnId() { } /** - * get state of current tramsaction + * get state of current transaction * @return */ @Override @@ -672,26 +677,35 @@ public int remainingTransactions() { */ @Override public void write(final byte[] record) - throws StreamingException, InterruptedException, - ImpersonationFailed { - if (ugi==null) { - recordWriter.write(getCurrentTxnId(), record); + throws StreamingException, InterruptedException { + write(Collections.singletonList(record)); + } + private void checkIsClosed() throws IllegalStateException { + if(isClosed) { + throw new IllegalStateException("TransactionBatch " + toString() + " has been closed()"); + } + } + /** + * A transaction batch opens a single HDFS file and writes multiple transaction to it. If there is any issue + * with the write, we can't continue to write to the same file any as it may be corrupted now (at the tail). + * This ensures that a client can't ignore these failures and continue to write. + */ + private void markDead(boolean success) { + if(success) { return; } + isClosed = true;//also ensures that heartbeat() is no-op since client is likely doing it async try { - ugi.doAs ( - new PrivilegedExceptionAction() { - @Override - public Void run() throws StreamingException { - recordWriter.write(getCurrentTxnId(), record); - return null; - } - } - ); - } catch (IOException e) { - throw new ImpersonationFailed("Failed wirting as user '" + username + - "' to endPoint :" + endPt + ". Transaction Id: " - + getCurrentTxnId(), e); + abort(true);//abort all remaining txns + } + catch(Exception ex) { + LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex); + } + try { + closeImpl(); + } + catch (Exception ex) { + LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex); } } @@ -707,24 +721,37 @@ public Void run() throws StreamingException { public void write(final Collection records) throws StreamingException, InterruptedException, ImpersonationFailed { - if (ugi==null) { - writeImpl(records); - return; - } + checkIsClosed(); + boolean success = false; try { - ugi.doAs ( - new PrivilegedExceptionAction() { - @Override - public Void run() throws StreamingException { - writeImpl(records); - return null; - } - } - ); - } catch (IOException e) { + if (ugi == null) { + writeImpl(records); + } else { + ugi.doAs( + new PrivilegedExceptionAction() { + @Override + public Void run() throws StreamingException { + writeImpl(records); + return null; + } + } + ); + } + success = true; + } catch(SerializationError ex) { + //this exception indicates that a {@code record} could not be parsed and the + //caller can decide whether to drop it or send it to dead letter queue. + //rolling back the txn and retrying won't help since the tuple will be exactly the same + //when it's replayed. + success = true; + throw ex; + } catch(IOException e){ throw new ImpersonationFailed("Failed writing as user '" + username + - "' to endPoint :" + endPt + ". Transaction Id: " - + getCurrentTxnId(), e); + "' to endPoint :" + endPt + ". Transaction Id: " + + getCurrentTxnId(), e); + } + finally { + markDead(success); } } @@ -746,25 +773,31 @@ private void writeImpl(Collection records) @Override public void commit() throws TransactionError, StreamingException, ImpersonationFailed, InterruptedException { - if (ugi==null) { - commitImpl(); - return; - } + checkIsClosed(); + boolean success = false; try { - ugi.doAs ( - new PrivilegedExceptionAction() { - @Override - public Void run() throws StreamingException { - commitImpl(); - return null; - } + if (ugi == null) { + commitImpl(); + } + else { + ugi.doAs( + new PrivilegedExceptionAction() { + @Override + public Void run() throws StreamingException { + commitImpl(); + return null; } - ); + } + ); + } + success = true; } catch (IOException e) { throw new ImpersonationFailed("Failed committing Txn ID " + getCurrentTxnId() + " as user '" + username + "'on endPoint :" + endPt + ". Transaction Id: ", e); } - + finally { + markDead(success); + } } private void commitImpl() throws TransactionError, StreamingException { @@ -791,8 +824,20 @@ private void commitImpl() throws TransactionError, StreamingException { @Override public void abort() throws TransactionError, StreamingException , ImpersonationFailed, InterruptedException { + if(isClosed) { + /** + * isDead is only set internally by this class. {@link #markDead(boolean)} will abort all + * remaining txns, so make this no-op to make sure that a well-behaved client that calls abort() + * error doesn't get misleading errors + */ + return; + } + abort(false); + } + private void abort(final boolean abortAllRemaining) throws TransactionError, StreamingException + , ImpersonationFailed, InterruptedException { if (ugi==null) { - abortImpl(); + abortImpl(abortAllRemaining); return; } try { @@ -800,7 +845,7 @@ public void abort() throws TransactionError, StreamingException new PrivilegedExceptionAction() { @Override public Void run() throws StreamingException { - abortImpl(); + abortImpl(abortAllRemaining); return null; } } @@ -811,11 +856,26 @@ public Void run() throws StreamingException { } } - private void abortImpl() throws TransactionError, StreamingException { + private void abortImpl(boolean abortAllRemaining) throws TransactionError, StreamingException { try { - recordWriter.clear(); - msClient.rollbackTxn(getCurrentTxnId()); + if(abortAllRemaining) { + //when last txn finished (abort/commit) the currentTxnIndex is pointing at that txn + //so we need to start from next one, if any. Also if batch was created but + //fetchTransactionBatch() was never called, we want to start with first txn + int minOpenTxnIndex = Math.max(currentTxnIndex + + (state == TxnState.ABORTED || state == TxnState.COMMITTED ? 1 : 0), 0); + for(currentTxnIndex = minOpenTxnIndex; + currentTxnIndex < txnIds.size(); currentTxnIndex++) { + msClient.rollbackTxn(txnIds.get(currentTxnIndex)); + } + } + else { + if (getCurrentTxnId() > 0) { + msClient.rollbackTxn(getCurrentTxnId()); + } + } state = TxnState.ABORTED; + recordWriter.clear(); } catch (NoSuchTxnException e) { throw new TransactionError("Unable to abort invalid transaction id : " + getCurrentTxnId(), e); @@ -827,6 +887,9 @@ private void abortImpl() throws TransactionError, StreamingException { @Override public void heartbeat() throws StreamingException, HeartBeatFailure { + if(isClosed) { + return; + } Long first = txnIds.get(currentTxnIndex); Long last = txnIds.get(txnIds.size()-1); try { @@ -840,14 +903,27 @@ public void heartbeat() throws StreamingException, HeartBeatFailure { } } + @Override + public boolean isClosed() { + return isClosed; + } /** - * Close the TransactionBatch + * Close the TransactionBatch. This will abort any still open txns in this batch. * @throws StreamingIOFailure I/O failure when closing transaction batch */ @Override public void close() throws StreamingException, ImpersonationFailed, InterruptedException { - if (ugi==null) { - state = TxnState.INACTIVE; + if(isClosed) { + return; + } + isClosed = true; + abortImpl(true);//abort proactively so that we don't wait for timeout + closeImpl();//perhaps we should add a version of RecordWriter.closeBatch(boolean abort) which + //will call RecordUpdater.close(boolean abort) + } + private void closeImpl() throws StreamingException, InterruptedException{ + state = TxnState.INACTIVE; + if(ugi == null) { recordWriter.closeBatch(); return; } @@ -856,7 +932,6 @@ public void close() throws StreamingException, ImpersonationFailed, InterruptedE new PrivilegedExceptionAction() { @Override public Void run() throws StreamingException { - state = TxnState.INACTIVE; recordWriter.closeBatch(); return null; } diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java index 28ea7d6..db73d6b 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java @@ -86,7 +86,7 @@ public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf) } @Override - public SerDe getSerde() throws SerializationError { + public SerDe getSerde() { return serde; } diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java index d9a083d..3c8670d 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java @@ -100,4 +100,5 @@ * @throws InterruptedException if call in interrupted */ public void close() throws StreamingException, InterruptedException; + public boolean isClosed(); } diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java index dd9c83d..198d077 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java @@ -20,7 +20,7 @@ public class TransactionError extends StreamingException { public TransactionError(String msg, Exception e) { - super(msg, e); + super(msg + (e == null ? "" : ": " + e.getMessage()), e); } public TransactionError(String msg) { diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 1723ff1..d38cdc0 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -29,11 +29,14 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnInfo; +import org.apache.hadoop.hive.metastore.api.TxnState; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; @@ -1189,7 +1192,120 @@ public void testBucketing() throws Exception { } + private void runCmdOnDriver(String cmd) throws QueryFailedException { + boolean t = runDDL(driver, cmd); + Assert.assertTrue(cmd + " failed", t); + } + + + @Test + public void testErrorHandling() throws Exception { + runCmdOnDriver("create database testErrors"); + runCmdOnDriver("use testErrors"); + runCmdOnDriver("create table T(a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testErrors", "T", null); + DelimitedInputWriter innerWriter = new DelimitedInputWriter("a,b".split(","),",", endPt); + FaultyWriter writer = new FaultyWriter(innerWriter); + StreamingConnection connection = endPt.newConnection(false); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); + txnBatch.close(); + txnBatch.heartbeat();//this is no-op on closed batch + txnBatch.abort();//ditto + GetOpenTxnsInfoResponse r = msClient.showTxns(); + Assert.assertEquals("HWM didn't match", 2, r.getTxn_high_water_mark()); + List ti = r.getOpen_txns(); + Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState()); + Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState()); + + Exception expectedEx = null; + try { + txnBatch.beginNextTransaction(); + } + catch(IllegalStateException ex) { + expectedEx = ex; + } + Assert.assertTrue("beginNextTransaction() should have failed", + expectedEx != null && expectedEx.getMessage().contains("has been closed()")); + expectedEx = null; + try { + txnBatch.write("name0,1,Hello streaming".getBytes()); + } + catch(IllegalStateException ex) { + expectedEx = ex; + } + Assert.assertTrue("write() should have failed", + expectedEx != null && expectedEx.getMessage().contains("has been closed()")); + expectedEx = null; + try { + txnBatch.commit(); + } + catch(IllegalStateException ex) { + expectedEx = ex; + } + Assert.assertTrue("commit() should have failed", + expectedEx != null && expectedEx.getMessage().contains("has been closed()")); + + txnBatch = connection.fetchTransactionBatch(2, writer); + txnBatch.beginNextTransaction(); + txnBatch.write("name2,2,Welcome to streaming".getBytes()); + txnBatch.write("name4,2,more Streaming unlimited".getBytes()); + txnBatch.write("name5,2,even more Streaming unlimited".getBytes()); + txnBatch.commit(); + + expectedEx = null; + txnBatch.beginNextTransaction(); + writer.enableErrors(); + try { + txnBatch.write("name6,2,Doh!".getBytes()); + } + catch(StreamingIOFailure ex) { + expectedEx = ex; + } + Assert.assertTrue("Wrong exception: " + (expectedEx != null ? expectedEx.getMessage() : "?"), + expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred")); + expectedEx = null; + try { + txnBatch.commit(); + } + catch(IllegalStateException ex) { + expectedEx = ex; + } + Assert.assertTrue("commit() should have failed", + expectedEx != null && expectedEx.getMessage().contains("has been closed()")); + + r = msClient.showTxns(); + Assert.assertEquals("HWM didn't match", 4, r.getTxn_high_water_mark()); + ti = r.getOpen_txns(); + Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState()); + Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState()); + //txnid 3 was committed and thus not open + Assert.assertEquals("wrong status ti(2)", TxnState.ABORTED, ti.get(2).getState()); + + writer.disableErrors(); + txnBatch = connection.fetchTransactionBatch(2, writer); + txnBatch.beginNextTransaction(); + txnBatch.write("name2,2,Welcome to streaming".getBytes()); + writer.enableErrors(); + expectedEx = null; + try { + txnBatch.commit(); + } + catch(StreamingIOFailure ex) { + expectedEx = ex; + } + Assert.assertTrue("Wrong exception: " + (expectedEx != null ? expectedEx.getMessage() : "?"), + expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred")); + + r = msClient.showTxns(); + Assert.assertEquals("HWM didn't match", 6, r.getTxn_high_water_mark()); + ti = r.getOpen_txns(); + Assert.assertEquals("wrong status ti(3)", TxnState.ABORTED, ti.get(3).getState()); + Assert.assertEquals("wrong status ti(4)", TxnState.ABORTED, ti.get(4).getState()); + txnBatch.abort(); + } // assumes un partitioned table // returns a map > @@ -1411,4 +1527,55 @@ public String toString() { " }"; } } + /** + * This is test-only wrapper around the real RecordWriter. + * It can simulate faults from lower levels to test error handling logic. + */ + private static final class FaultyWriter implements RecordWriter { + private final RecordWriter delegate; + private boolean shouldThrow = false; + + private FaultyWriter(RecordWriter delegate) { + assert delegate != null; + this.delegate = delegate; + } + @Override + public void write(long transactionId, byte[] record) throws StreamingException { + delegate.write(transactionId, record); + produceFault(); + } + @Override + public void flush() throws StreamingException { + delegate.flush(); + produceFault(); + } + @Override + public void clear() throws StreamingException { + delegate.clear(); + } + @Override + public void newBatch(Long minTxnId, Long maxTxnID) throws StreamingException { + delegate.newBatch(minTxnId, maxTxnID); + } + @Override + public void closeBatch() throws StreamingException { + delegate.closeBatch(); + } + + /** + * allows testing of "unexpected" errors + * @throws StreamingIOFailure + */ + private void produceFault() throws StreamingIOFailure { + if(shouldThrow) { + throw new StreamingIOFailure("Simulated fault occurred"); + } + } + void enableErrors() { + shouldThrow = true; + } + void disableErrors() { + shouldThrow = false; + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 67c5a11..c002f12 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -267,6 +267,9 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { item.setFieldValue(ROW_ID, rowId); } + public String toString() { + return getClass().getName() + "[" + path +"]"; + } /** * To handle multiple INSERT... statements in a single transaction, we want to make sure * to generate unique {@code rowId} for all inserted rows of the transaction.