diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java index 5c15675..0c6b9ea 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java @@ -65,6 +65,8 @@ final AcidOutputFormat outf; private Object[] bucketFieldData; // Pre-allocated in constructor. Updated on each write. + private Long curBatchMinTxnId; + private Long curBatchMaxTxnId; protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf) throws ConnectionError, StreamingException { @@ -98,6 +100,12 @@ protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf) } } + /** + * used to tag error msgs to provied some breadcrumbs + */ + String getWatermark() { + return partitionPath + " txnIds[" + curBatchMinTxnId + "," + curBatchMaxTxnId + "]"; + } // return the column numbers of the bucketed columns private List getBucketColIDs(List bucketCols, List cols) { ArrayList result = new ArrayList(bucketCols.size()); @@ -164,22 +172,32 @@ public void newBatch(Long minTxnId, Long maxTxnID) throws StreamingIOFailure, SerializationError { try { LOG.debug("Creating Record updater"); + curBatchMinTxnId = minTxnId; + curBatchMaxTxnId = maxTxnID; updaters = createRecordUpdaters(totalBuckets, minTxnId, maxTxnID); } catch (IOException e) { - LOG.error("Failed creating record updater", e); - throw new StreamingIOFailure("Unable to get new record Updater", e); + String errMsg = "Failed creating RecordUpdaterS for " + getWatermark(); + LOG.error(errMsg, e); + throw new StreamingIOFailure(errMsg, e); } } @Override public void closeBatch() throws StreamingIOFailure { - try { - for (RecordUpdater updater : updaters) { + boolean haveError = false; + for (RecordUpdater updater : updaters) { + try { + //try not to leave any files open updater.close(false); } - updaters.clear(); - } catch (IOException e) { - throw new StreamingIOFailure("Unable to close recordUpdater", e); + catch(Exception ex) { + haveError = true; + LOG.error("Unable to close " + updater + " due to: " + ex.getMessage(), ex); + } + } + updaters.clear(); + if(haveError) { + throw new StreamingIOFailure("Encountered errors while closing (see logs) " + getWatermark()); } } diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java index ffa51c9..03f6a44 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java @@ -29,6 +29,7 @@ public ConnectionError(String msg, Exception innerEx) { } public ConnectionError(HiveEndPoint endPoint, Exception innerEx) { - super("Error connecting to " + endPoint, innerEx); + super("Error connecting to " + endPoint + + (innerEx == null ? "" : ": " + innerEx.getMessage()), innerEx); } } diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java index 4f1154e..394cc54 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java @@ -243,7 +243,7 @@ public void write(long transactionId, byte[] record) } @Override - public SerDe getSerde() throws SerializationError { + public SerDe getSerde() { return serde; } diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index 2f2d44a..b0480a2 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -18,6 +18,7 @@ package org.apache.hive.hcatalog.streaming; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.cli.CliSessionState; @@ -503,8 +504,8 @@ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveCo } // class ConnectionImpl - - private static class TransactionBatchImpl implements TransactionBatch { + @VisibleForTesting + static class TransactionBatchImpl implements TransactionBatch { private final String username; private final UserGroupInformation ugi; private final HiveEndPoint endPt; @@ -512,27 +513,28 @@ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveCo private final RecordWriter recordWriter; private final List txnIds; - private int currentTxnIndex; + private int currentTxnIndex = -1; private final String partNameForLock; private TxnState state; private LockRequest lockRequest = null; + /** + * once any operation on this batch encounters a system exception + * (e.g. IOException on write) it's safest to assume that we can't write to the + * file backing this batch any more. This guards important public methods + */ + private volatile boolean isClosed = false; /** * Represents a batch of transactions acquired from MetaStore * - * @param user - * @param ugi - * @param endPt - * @param numTxns - * @param msClient - * @param recordWriter * @throws StreamingException if failed to create new RecordUpdater for batch * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch */ private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEndPoint endPt , final int numTxns, final IMetaStoreClient msClient, RecordWriter recordWriter) throws StreamingException, TransactionBatchUnAvailable, InterruptedException { + boolean success = false; try { if ( endPt.partitionVals!=null && !endPt.partitionVals.isEmpty() ) { Table tableObj = msClient.getTable(endPt.database, endPt.table); @@ -549,15 +551,18 @@ private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEn txnIds = openTxnImpl(msClient, user, numTxns, ugi); - - this.currentTxnIndex = -1; this.state = TxnState.INACTIVE; recordWriter.newBatch(txnIds.get(0), txnIds.get(txnIds.size()-1)); + success = true; } catch (TException e) { throw new TransactionBatchUnAvailable(endPt, e); } catch (IOException e) { throw new TransactionBatchUnAvailable(endPt, e); } + finally { + //clean up if above throws + markDead(success); + } } private List openTxnImpl(final IMetaStoreClient msClient, final String user, final int numTxns, UserGroupInformation ugi) @@ -589,6 +594,7 @@ public String toString() { @Override public void beginNextTransaction() throws TransactionError, ImpersonationFailed, InterruptedException { + checkIsClosed(); if (ugi==null) { beginNextTransactionImpl(); return; @@ -610,10 +616,12 @@ public Void run() throws TransactionError { } private void beginNextTransactionImpl() throws TransactionError { + state = TxnState.INACTIVE;//clear state from previous txn if ( currentTxnIndex >= txnIds.size() ) throw new InvalidTrasactionState("No more transactions available in" + " current batch for end point : " + endPt); ++currentTxnIndex; + state = TxnState.OPEN; lockRequest = createLockRequest(endPt, partNameForLock, username, getCurrentTxnId()); try { LockResponse res = msClient.lock(lockRequest); @@ -623,8 +631,6 @@ private void beginNextTransactionImpl() throws TransactionError { } catch (TException e) { throw new TransactionError("Unable to acquire lock on " + endPt, e); } - - state = TxnState.OPEN; } /** @@ -640,7 +646,7 @@ public Long getCurrentTxnId() { } /** - * get state of current tramsaction + * get state of current transaction * @return */ @Override @@ -672,14 +678,16 @@ public int remainingTransactions() { */ @Override public void write(final byte[] record) - throws StreamingException, InterruptedException, - ImpersonationFailed { - if (ugi==null) { - recordWriter.write(getCurrentTxnId(), record); - return; - } + throws StreamingException, InterruptedException { + checkIsClosed(); + boolean success = false; try { - ugi.doAs ( + produceFault(); + if (ugi == null) { + recordWriter.write(getCurrentTxnId(), record); + } + else { + ugi.doAs( new PrivilegedExceptionAction() { @Override public Void run() throws StreamingException { @@ -687,12 +695,51 @@ public Void run() throws StreamingException { return null; } } - ); + ); + } + success = true; + } catch(SerializationError ex) { + success = true; + //propagate to give caller opportunity to handle this + throw ex; } catch (IOException e) { throw new ImpersonationFailed("Failed wirting as user '" + username + "' to endPoint :" + endPt + ". Transaction Id: " + getCurrentTxnId(), e); } + finally { + //may seem silly, but we need to propagate "recoverable" errors and place the batch in + //special state on all others + markDead(success); + } + } + private void checkIsClosed() throws IllegalStateException { + if(isClosed) { + throw new IllegalStateException("TransactionBatch " + toString() + " has been closed()"); + } + } + /** + * A transaction batch opens a single HDFS file and writes multiple transaction to it. If there is any issue + * with the write, we can't continue to write to the same file any as it may be corrupted now (at the tail). + * This ensures that a client can't ignore these failures and continue to write. + */ + private void markDead(boolean success) { + if(success) { + return; + } + isClosed = true;//also ensures that heartbeat() is no-op since client is likely doing it async + try { + abort(true);//abort all remaining txns + } + catch(Exception ex) { + LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex); + } + try { + closeImpl(); + } + catch (Exception ex) { + LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex); + } } @@ -707,24 +754,33 @@ public Void run() throws StreamingException { public void write(final Collection records) throws StreamingException, InterruptedException, ImpersonationFailed { - if (ugi==null) { - writeImpl(records); - return; - } + checkIsClosed(); + boolean success = false; try { - ugi.doAs ( - new PrivilegedExceptionAction() { - @Override - public Void run() throws StreamingException { - writeImpl(records); - return null; - } - } - ); - } catch (IOException e) { + if (ugi == null) { + writeImpl(records); + } else { + ugi.doAs( + new PrivilegedExceptionAction() { + @Override + public Void run() throws StreamingException { + writeImpl(records); + return null; + } + } + ); + } + success = true; + } catch(SerializationError ex) { + success = true; + throw ex; + } catch(IOException e){ throw new ImpersonationFailed("Failed writing as user '" + username + - "' to endPoint :" + endPt + ". Transaction Id: " - + getCurrentTxnId(), e); + "' to endPoint :" + endPt + ". Transaction Id: " + + getCurrentTxnId(), e); + } + finally { + markDead(success); } } @@ -746,30 +802,37 @@ private void writeImpl(Collection records) @Override public void commit() throws TransactionError, StreamingException, ImpersonationFailed, InterruptedException { - if (ugi==null) { - commitImpl(); - return; - } + checkIsClosed(); + boolean success = false; try { - ugi.doAs ( - new PrivilegedExceptionAction() { - @Override - public Void run() throws StreamingException { - commitImpl(); - return null; - } + if (ugi == null) { + commitImpl(); + } + else { + ugi.doAs( + new PrivilegedExceptionAction() { + @Override + public Void run() throws StreamingException { + commitImpl(); + return null; } - ); + } + ); + } + success = true; } catch (IOException e) { throw new ImpersonationFailed("Failed committing Txn ID " + getCurrentTxnId() + " as user '" + username + "'on endPoint :" + endPt + ". Transaction Id: ", e); } - + finally { + markDead(success); + } } private void commitImpl() throws TransactionError, StreamingException { try { recordWriter.flush(); + produceFault(); msClient.commitTxn(txnIds.get(currentTxnIndex)); state = TxnState.COMMITTED; } catch (NoSuchTxnException e) { @@ -791,8 +854,20 @@ private void commitImpl() throws TransactionError, StreamingException { @Override public void abort() throws TransactionError, StreamingException , ImpersonationFailed, InterruptedException { + if(isClosed) { + /** + * isDead is only set internally by this class. {@link #markDead(boolean)} will abort all + * remaining txns, so make this no-op to make sure that a well-behaved client that calls abort() + * error doesn't get misleading errors + */ + return; + } + abort(false); + } + private void abort(final boolean abortAllRemaining) throws TransactionError, StreamingException + , ImpersonationFailed, InterruptedException { if (ugi==null) { - abortImpl(); + abortImpl(abortAllRemaining); return; } try { @@ -800,7 +875,7 @@ public void abort() throws TransactionError, StreamingException new PrivilegedExceptionAction() { @Override public Void run() throws StreamingException { - abortImpl(); + abortImpl(abortAllRemaining); return null; } } @@ -811,11 +886,26 @@ public Void run() throws StreamingException { } } - private void abortImpl() throws TransactionError, StreamingException { + private void abortImpl(boolean abortAllRemaining) throws TransactionError, StreamingException { try { - recordWriter.clear(); - msClient.rollbackTxn(getCurrentTxnId()); + if(abortAllRemaining) { + //when last txn finished (abort/commit) the currentTxnIndex is pointing at that txn + //so we need to start from next one, if any. Also if batch was created but + //fetchTransactionBatch() was never called, we want to start with first txn + int minOpenTxnIndex = Math.max(currentTxnIndex + + (state == TxnState.ABORTED || state == TxnState.COMMITTED ? 1 : 0), 0); + for(currentTxnIndex = minOpenTxnIndex; + currentTxnIndex < txnIds.size(); currentTxnIndex++) { + msClient.rollbackTxn(txnIds.get(currentTxnIndex)); + } + } + else { + if (getCurrentTxnId() > 0) { + msClient.rollbackTxn(getCurrentTxnId()); + } + } state = TxnState.ABORTED; + recordWriter.clear(); } catch (NoSuchTxnException e) { throw new TransactionError("Unable to abort invalid transaction id : " + getCurrentTxnId(), e); @@ -827,6 +917,9 @@ private void abortImpl() throws TransactionError, StreamingException { @Override public void heartbeat() throws StreamingException, HeartBeatFailure { + if(isClosed) { + return; + } Long first = txnIds.get(currentTxnIndex); Long last = txnIds.get(txnIds.size()-1); try { @@ -840,14 +933,27 @@ public void heartbeat() throws StreamingException, HeartBeatFailure { } } + @Override + public boolean isClosed() { + return isClosed; + } /** - * Close the TransactionBatch + * Close the TransactionBatch. This will abort any still open txns in this batch. * @throws StreamingIOFailure I/O failure when closing transaction batch */ @Override public void close() throws StreamingException, ImpersonationFailed, InterruptedException { - if (ugi==null) { - state = TxnState.INACTIVE; + if(isClosed) { + return; + } + isClosed = true; + abortImpl(true);//abort proactively so that we don't wait for timeout + closeImpl();//perhaps we should add a version of RecordWriter.closeBatch(boolean abort) which + //will call RecordUpdater.close(boolean abort) + } + private void closeImpl() throws StreamingException, InterruptedException{ + state = TxnState.INACTIVE; + if(ugi == null) { recordWriter.closeBatch(); return; } @@ -856,7 +962,6 @@ public void close() throws StreamingException, ImpersonationFailed, InterruptedE new PrivilegedExceptionAction() { @Override public Void run() throws StreamingException { - state = TxnState.INACTIVE; recordWriter.closeBatch(); return null; } @@ -885,6 +990,21 @@ private static LockRequest createLockRequest(final HiveEndPoint hiveEndPoint, return rqstBuilder.build(); } + + /** + * allows testing of "unexpected" errors + * @throws StreamingIOFailure + */ + private void produceFault() throws StreamingIOFailure { + if(shouldThrow) { + throw new StreamingIOFailure("Simulated fault occurred"); + } + } + private boolean shouldThrow = false; + @VisibleForTesting + final void makeWriteAndCommitThrow() { + shouldThrow = true; + } } // class TransactionBatchImpl static HiveConf createHiveConf(Class clazz, String metaStoreUri) { diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java index 28ea7d6..db73d6b 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java @@ -86,7 +86,7 @@ public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf) } @Override - public SerDe getSerde() throws SerializationError { + public SerDe getSerde() { return serde; } diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java index d9a083d..3c8670d 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java @@ -100,4 +100,5 @@ * @throws InterruptedException if call in interrupted */ public void close() throws StreamingException, InterruptedException; + public boolean isClosed(); } diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java index dd9c83d..198d077 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java @@ -20,7 +20,7 @@ public class TransactionError extends StreamingException { public TransactionError(String msg, Exception e) { - super(msg, e); + super(msg + (e == null ? "" : ": " + e.getMessage()), e); } public TransactionError(String msg) { diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 1723ff1..8eec677 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -29,11 +29,14 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnInfo; +import org.apache.hadoop.hive.metastore.api.TxnState; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; @@ -1189,7 +1192,119 @@ public void testBucketing() throws Exception { } + private void runCmdOnDriver(String cmd) throws QueryFailedException { + boolean t = runDDL(driver, cmd); + Assert.assertTrue(cmd + " failed", t); + } + + + @Test + public void testErrorHandling() throws Exception { + runCmdOnDriver("create database testErrors"); + runCmdOnDriver("use testErrors"); + runCmdOnDriver("create table T(a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testErrors", "T", null); + DelimitedInputWriter writer = new DelimitedInputWriter("a,b".split(","),",", endPt); + StreamingConnection connection = endPt.newConnection(false); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); + txnBatch.close(); + txnBatch.heartbeat();//this is no-op on closed batch + txnBatch.abort();//ditto + GetOpenTxnsInfoResponse r = msClient.showTxns(); + Assert.assertEquals("HWM didn't match", 2, r.getTxn_high_water_mark()); + List ti = r.getOpen_txns(); + Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState()); + Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState()); + + Exception expectedEx = null; + try { + txnBatch.beginNextTransaction(); + } + catch(IllegalStateException ex) { + expectedEx = ex; + } + Assert.assertTrue("beginNextTransaction() should have failed", + expectedEx != null && expectedEx.getMessage().contains("has been closed()")); + expectedEx = null; + try { + txnBatch.write("name0,1,Hello streaming".getBytes()); + } + catch(IllegalStateException ex) { + expectedEx = ex; + } + Assert.assertTrue("write() should have failed", + expectedEx != null && expectedEx.getMessage().contains("has been closed()")); + expectedEx = null; + try { + txnBatch.commit(); + } + catch(IllegalStateException ex) { + expectedEx = ex; + } + Assert.assertTrue("commit() should have failed", + expectedEx != null && expectedEx.getMessage().contains("has been closed()")); + + txnBatch = connection.fetchTransactionBatch(2, writer); + txnBatch.beginNextTransaction(); + txnBatch.write("name2,2,Welcome to streaming".getBytes()); + txnBatch.write("name4,2,more Streaming unlimited".getBytes()); + txnBatch.write("name5,2,even more Streaming unlimited".getBytes()); + txnBatch.commit(); + + expectedEx = null; + txnBatch.beginNextTransaction(); + ((HiveEndPoint.TransactionBatchImpl)txnBatch).makeWriteAndCommitThrow(); + try { + txnBatch.write("name6,2,Doh!".getBytes()); + } + catch(StreamingIOFailure ex) { + expectedEx = ex; + } + Assert.assertTrue("Wrong exception: " + (expectedEx != null ? expectedEx.getMessage() : "?"), + expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred")); + expectedEx = null; + try { + txnBatch.commit(); + } + catch(IllegalStateException ex) { + expectedEx = ex; + } + Assert.assertTrue("commit() should have failed", + expectedEx != null && expectedEx.getMessage().contains("has been closed()")); + + r = msClient.showTxns(); + Assert.assertEquals("HWM didn't match", 4, r.getTxn_high_water_mark()); + ti = r.getOpen_txns(); + Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState()); + Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState()); + //txnid 3 was committed and thus not open + Assert.assertEquals("wrong status ti(2)", TxnState.ABORTED, ti.get(2).getState()); + + + txnBatch = connection.fetchTransactionBatch(2, writer); + txnBatch.beginNextTransaction(); + txnBatch.write("name2,2,Welcome to streaming".getBytes()); + ((HiveEndPoint.TransactionBatchImpl)txnBatch).makeWriteAndCommitThrow(); + expectedEx = null; + try { + txnBatch.commit(); + } + catch(StreamingIOFailure ex) { + expectedEx = ex; + } + Assert.assertTrue("Wrong exception: " + (expectedEx != null ? expectedEx.getMessage() : "?"), + expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred")); + + r = msClient.showTxns(); + Assert.assertEquals("HWM didn't match", 6, r.getTxn_high_water_mark()); + ti = r.getOpen_txns(); + Assert.assertEquals("wrong status ti(3)", TxnState.ABORTED, ti.get(3).getState()); + Assert.assertEquals("wrong status ti(4)", TxnState.ABORTED, ti.get(4).getState()); + + txnBatch.abort(); + } // assumes un partitioned table // returns a map > diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 67c5a11..c002f12 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -267,6 +267,9 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { item.setFieldValue(ROW_ID, rowId); } + public String toString() { + return getClass().getName() + "[" + path +"]"; + } /** * To handle multiple INSERT... statements in a single transaction, we want to make sure * to generate unique {@code rowId} for all inserted rows of the transaction.