diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java index b8615cb..4ae239e 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java @@ -178,7 +178,9 @@ protected int getBucket(Object row) throws SerializationError { public void flush() throws StreamingIOFailure { try { for (RecordUpdater updater : updaters) { - updater.flush(); + if (updater != null) { + updater.flush(); + } } } catch (IOException e) { throw new StreamingIOFailure("Unable to flush recordUpdater", e); @@ -198,15 +200,11 @@ public void clear() throws StreamingIOFailure { @Override public void newBatch(Long minTxnId, Long maxTxnID) throws StreamingIOFailure, SerializationError { - try { - LOG.debug("Creating Record updater"); - curBatchMinTxnId = minTxnId; - curBatchMaxTxnId = maxTxnID; - updaters = createRecordUpdaters(totalBuckets, minTxnId, maxTxnID); - } catch (IOException e) { - String errMsg = "Failed creating RecordUpdaterS for " + getWatermark(); - LOG.error(errMsg, e); - throw new StreamingIOFailure(errMsg, e); + curBatchMinTxnId = minTxnId; + curBatchMaxTxnId = maxTxnID; + updaters = new ArrayList(); + for (int bucket = 0; bucket < totalBuckets; bucket++) { + updaters.add(bucket, null); } } @@ -214,13 +212,14 @@ public void newBatch(Long minTxnId, Long maxTxnID) public void closeBatch() throws StreamingIOFailure { boolean haveError = false; for (RecordUpdater updater : updaters) { - try { - //try not to leave any files open - updater.close(false); - } - catch(Exception ex) { - haveError = true; - LOG.error("Unable to close " + updater + " due to: " + ex.getMessage(), ex); + if (updater != null) { + try { + //try not to leave any files open + updater.close(false); + } catch (Exception ex) { + haveError = true; + LOG.error("Unable to close " + updater + " due to: " + ex.getMessage(), ex); + } } } updaters.clear(); @@ -252,17 +251,6 @@ public void closeBatch() throws StreamingIOFailure { return bucketFieldData; } - - - private ArrayList createRecordUpdaters(int bucketCount, Long minTxnId, Long maxTxnID) - throws IOException, SerializationError { - ArrayList result = new ArrayList(bucketCount); - for (int bucket = 0; bucket < bucketCount; bucket++) { - result.add(createRecordUpdater(bucket, minTxnId, maxTxnID) ); - } - return result; - } - private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxTxnID) throws IOException, SerializationError { try { @@ -286,6 +274,21 @@ private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxT } } + RecordUpdater getRecordUpdater(int bucketId) throws StreamingIOFailure, SerializationError { + RecordUpdater recordUpdater = updaters.get(bucketId); + if (recordUpdater == null) { + try { + recordUpdater = createRecordUpdater(bucketId, curBatchMinTxnId, curBatchMaxTxnId); + } catch (IOException e) { + String errMsg = "Failed creating RecordUpdater for " + getWatermark(); + LOG.error(errMsg, e); + throw new StreamingIOFailure(errMsg, e); + } + updaters.set(bucketId, recordUpdater); + } + return recordUpdater; + } + private Path getPathForEndPoint(IMetaStoreClient msClient, HiveEndPoint endPoint) throws StreamingException { try { diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java index 7ab2fc6..87eb4c4 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java @@ -262,7 +262,7 @@ public void write(long transactionId, byte[] record) byte[] orderedFields = reorderFields(record); Object encodedRow = encode(orderedFields); int bucket = getBucket(encodedRow); - updaters.get(bucket).insert(transactionId, encodedRow); + getRecordUpdater(bucket).insert(transactionId, encodedRow); } catch (IOException e) { throw new StreamingIOFailure("Error writing record in transaction (" + transactionId + ")", e); diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java index 1facad1..31212ee 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java @@ -122,7 +122,7 @@ public void write(long transactionId, byte[] record) try { Object encodedRow = encode(record); int bucket = getBucket(encodedRow); - updaters.get(bucket).insert(transactionId, encodedRow); + getRecordUpdater(bucket).insert(transactionId, encodedRow); } catch (IOException e) { throw new StreamingIOFailure("Error writing record in transaction(" + transactionId + ")", e); diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 40cf2b5..fefa862 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -1223,13 +1223,10 @@ public void testBucketing() throws Exception { System.err.println(actual2); // assert bucket listing is as expected - Assert.assertEquals("number of buckets does not match expectation", actual1.values().size(), 4); + Assert.assertEquals("number of buckets does not match expectation", actual1.values().size(), 3); Assert.assertEquals("records in bucket does not match expectation", actual1.get(0).size(), 2); Assert.assertEquals("records in bucket does not match expectation", actual1.get(1).size(), 1); - Assert.assertEquals("records in bucket does not match expectation", actual1.get(2).size(), 0); Assert.assertEquals("records in bucket does not match expectation", actual1.get(3).size(), 1); - - } private void runCmdOnDriver(String cmd) throws QueryFailedException { boolean t = runDDL(driver, cmd); @@ -1381,7 +1378,7 @@ public void testFileDumpCorruptDataFiles() throws Exception { String errDump = new String(myErr.toByteArray()); Assert.assertEquals(false, errDump.contains("Exception")); - Assert.assertEquals(true, errDump.contains("4 file(s) are corrupted")); + Assert.assertEquals(true, errDump.contains("3 file(s) are corrupted")); Assert.assertEquals(false, errDump.contains("is still open for writes.")); origErr = System.err; @@ -1397,9 +1394,6 @@ public void testFileDumpCorruptDataFiles() throws Exception { Assert.assertEquals(true, errDump.contains("bucket_00000 recovered successfully!")); Assert.assertEquals(true, errDump.contains("No readable footers found. Creating empty orc file.")); Assert.assertEquals(true, errDump.contains("bucket_00001 recovered successfully!")); - Assert.assertEquals(true, errDump.contains("bucket_00002 recovered successfully!")); - // check for bucket2's last readable footer offset - Assert.assertEquals(true, errDump.contains("Readable footerOffsets: [" + readableFooter + "]")); Assert.assertEquals(true, errDump.contains("bucket_00003 recovered successfully!")); Assert.assertEquals(false, errDump.contains("Exception")); Assert.assertEquals(false, errDump.contains("is still open for writes.")); diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index f81752f..0587e80 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -778,13 +778,13 @@ public void minorCompactAfterAbort() throws Exception { Path resultDelta = null; for (int i = 0; i < names.length; i++) { names[i] = stat[i].getPath().getName(); - if (names[i].equals("delta_0000001_0000006")) { + if (names[i].equals("delta_0000001_0000004")) { resultDelta = stat[i].getPath(); } } Arrays.sort(names); String[] expected = new String[]{"delta_0000001_0000002", - "delta_0000001_0000006", "delta_0000003_0000004", "delta_0000005_0000006"}; + "delta_0000001_0000004", "delta_0000003_0000004"}; if (!Arrays.deepEquals(expected, names)) { Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names)); } @@ -844,11 +844,11 @@ public void majorCompactAfterAbort() throws Exception { Assert.fail("majorCompactAfterAbort FileStatus[] stat " + Arrays.toString(stat)); } if (1 != stat.length) { - Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat)); + Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat)); } String name = stat[0].getPath().getName(); - if (!name.equals("base_0000006")) { - Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000006"); + if (!name.equals("base_0000004")) { + Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000004"); } checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L); } finally {