diff --git streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java index 03c9fe00a3..fc9a2dd534 100644 --- streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java +++ streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -366,7 +366,9 @@ public void flush() throws StreamingIOFailure { @Override public void close() throws StreamingIOFailure { - heapMemoryMonitor.close(); + if(heapMemoryMonitor != null) { + heapMemoryMonitor.close(); + } boolean haveError = false; String partition = null; if (LOG.isDebugEnabled()) { @@ -395,7 +397,9 @@ public void close() throws StreamingIOFailure { logStats("Stats after close:"); } try { - this.fs.close(); + if(this.fs != null) { + this.fs.close(); + } } catch (IOException e) { throw new StreamingIOFailure("Error while closing FileSystem", e); } @@ -630,7 +634,7 @@ protected void logStats(final String prefix) { .filter(Objects::nonNull) .mapToLong(RecordUpdater::getBufferedRowCount) .sum(); - MemoryUsage memoryUsage = heapMemoryMonitor.getTenuredGenMemoryUsage(); + MemoryUsage memoryUsage = heapMemoryMonitor == null ? null : heapMemoryMonitor.getTenuredGenMemoryUsage(); String oldGenUsage = "NA"; if (memoryUsage != null) { oldGenUsage = "used/max => " + LlapUtil.humanReadableByteCount(memoryUsage.getUsed()) + "/" + diff --git streaming/src/test/org/apache/hive/streaming/TestStreaming.java streaming/src/test/org/apache/hive/streaming/TestStreaming.java index 58b3ae2bd4..35a220facd 100644 --- streaming/src/test/org/apache/hive/streaming/TestStreaming.java +++ streaming/src/test/org/apache/hive/streaming/TestStreaming.java @@ -1819,6 +1819,27 @@ public void testTransactionBatchAbort() throws Exception { } + @Test(expected = ClassCastException.class) + public void testFileSystemError() throws Exception { + // Bad file system object, ClassCastException should occur during record writer init + conf.set("fs.raw.impl", Object.class.getName()); + + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(partitionVals) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + connection.beginTransaction(); + } + @Test public void testTransactionBatchAbortAndCommit() throws Exception {