diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java index b8615cb..3a5b781 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java @@ -198,16 +198,9 @@ public void clear() throws StreamingIOFailure { @Override public void newBatch(Long minTxnId, Long maxTxnID) throws StreamingIOFailure, SerializationError { - try { - LOG.debug("Creating Record updater"); - curBatchMinTxnId = minTxnId; - curBatchMaxTxnId = maxTxnID; - updaters = createRecordUpdaters(totalBuckets, minTxnId, maxTxnID); - } catch (IOException e) { - String errMsg = "Failed creating RecordUpdaterS for " + getWatermark(); - LOG.error(errMsg, e); - throw new StreamingIOFailure(errMsg, e); - } + curBatchMinTxnId = minTxnId; + curBatchMaxTxnId = maxTxnID; + updaters = new ArrayList(totalBuckets); } @Override @@ -252,17 +245,6 @@ public void closeBatch() throws StreamingIOFailure { return bucketFieldData; } - - - private ArrayList createRecordUpdaters(int bucketCount, Long minTxnId, Long maxTxnID) - throws IOException, SerializationError { - ArrayList result = new ArrayList(bucketCount); - for (int bucket = 0; bucket < bucketCount; bucket++) { - result.add(createRecordUpdater(bucket, minTxnId, maxTxnID) ); - } - return result; - } - private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxTxnID) throws IOException, SerializationError { try { @@ -286,6 +268,21 @@ private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxT } } + RecordUpdater getRecordUpdater(int bucketId) throws StreamingIOFailure, SerializationError { + RecordUpdater recordUpdater = updaters.get(bucketId); + if (recordUpdater == null) { + try { + recordUpdater = createRecordUpdater(bucketId, curBatchMinTxnId, curBatchMaxTxnId); + } catch (IOException e) { + String errMsg = "Failed creating RecordUpdater for " + getWatermark(); + LOG.error(errMsg, e); + throw new StreamingIOFailure(errMsg, e); + } + updaters.add(bucketId, recordUpdater); + } + return recordUpdater; + } + private Path getPathForEndPoint(IMetaStoreClient msClient, HiveEndPoint endPoint) throws StreamingException { try { diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java index 7ab2fc6..87eb4c4 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java @@ -262,7 +262,7 @@ public void write(long transactionId, byte[] record) byte[] orderedFields = reorderFields(record); Object encodedRow = encode(orderedFields); int bucket = getBucket(encodedRow); - updaters.get(bucket).insert(transactionId, encodedRow); + getRecordUpdater(bucket).insert(transactionId, encodedRow); } catch (IOException e) { throw new StreamingIOFailure("Error writing record in transaction (" + transactionId + ")", e); diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java index 1facad1..31212ee 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java @@ -122,7 +122,7 @@ public void write(long transactionId, byte[] record) try { Object encodedRow = encode(record); int bucket = getBucket(encodedRow); - updaters.get(bucket).insert(transactionId, encodedRow); + getRecordUpdater(bucket).insert(transactionId, encodedRow); } catch (IOException e) { throw new StreamingIOFailure("Error writing record in transaction(" + transactionId + ")", e);