diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index e187ce1..1359dc3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -493,6 +493,9 @@ public void flush() throws IOException { public void close(boolean abort) throws IOException { if (abort) { if (flushLengths == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Close on abort for path: {}.. Deleting..", path); + } fs.delete(path, false); } } else if (!writerClosed) { @@ -500,25 +503,47 @@ public void close(boolean abort) throws IOException { // When split-update is enabled, we can choose not to write // any delta files when there are no inserts. In such cases only the delete_deltas // would be written & they are closed separately below. - if (writer != null && indexBuilder.acidStats.inserts > 0) { - writer.close(); // normal close, when there are inserts. + if (indexBuilder.acidStats.inserts > 0) { + if (writer != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Closing writer for path: {} acid stats: {}", path, indexBuilder.acidStats); + } + writer.close(); // normal close, when there are inserts. + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("No insert events in path: {}.. Deleting..", path); + } + fs.delete(path, false); } } else { //so that we create empty bucket files when needed (but see HIVE-17138) + if (LOG.isDebugEnabled()) { + LOG.debug("Initializing writer before close (to create empty buckets) for path: {}", path); + } initWriter(); writer.close(); // normal close. } if (deleteEventWriter != null) { if (deleteEventIndexBuilder.acidStats.deletes > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Closing delete event writer for path: {} acid stats: {}", path, indexBuilder.acidStats); + } // Only need to write out & close the delete_delta if there have been any. deleteEventWriter.close(); } else { + if (LOG.isDebugEnabled()) { + LOG.debug("No delete events in path: {}.. Deleting..", path); + } // Just remove delete_delta, if there have been no delete events. fs.delete(deleteEventPath, false); } } } if (flushLengths != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Closing and deleting flush length file for path: {}", path); + } flushLengths.close(); fs.delete(OrcAcidUtils.getSideFile(path), false); } diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java index 0866850..1102706 100644 --- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -24,6 +24,8 @@ import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; import java.lang.management.MemoryUsage; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -36,6 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.HeapMemoryMonitor; import org.apache.hadoop.hive.common.JavaUtils; @@ -99,6 +102,7 @@ protected boolean autoFlush; protected float memoryUsageThreshold; protected long ingestSizeThreshold; + protected FileSystem fs; public AbstractRecordWriter(final String lineDelimiter) { this.lineDelimiter = lineDelimiter == null || lineDelimiter.isEmpty() ? @@ -134,6 +138,15 @@ public void init(StreamingConnection conn, long minWriteId, long maxWriteId) thr this.conf = conn.getHiveConf(); this.defaultPartitionName = conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME); this.table = conn.getTable(); + String location = table.getSd().getLocation(); + try { + URI uri = new URI(location); + this.fs = FileSystem.get(uri, conf); + } catch (URISyntaxException e) { + throw new StreamingException("Unable to create URI from location: " + location, e); + } catch (IOException e) { + throw new StreamingException("Unable to get filesystem for location: " + location, e); + } this.inputColumns = table.getSd().getCols().stream().map(FieldSchema::getName).collect(Collectors.toList()); this.inputTypes = table.getSd().getCols().stream().map(FieldSchema::getType).collect(Collectors.toList()); if (conn.isPartitionedTable() && conn.isDynamicPartitioning()) { @@ -454,6 +467,7 @@ protected RecordUpdater createRecordUpdater(final Path partitionPath, int bucket tblProperties.putAll(table.getParameters()); return acidOutputFormat.getRecordUpdater(partitionPath, new AcidOutputFormat.Options(conf) + .filesystem(fs) .inspector(outputRowObjectInspector) .bucket(bucketId) .tableProperties(tblProperties) diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index f697211..1d3a4ec 100644 --- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.io.InputStream; import java.net.InetAddress; +import java.net.URI; +import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; @@ -64,6 +66,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.common.util.ShutdownHookManager; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -198,6 +201,18 @@ private HiveStreamingConnection(Builder builder) throws StreamingException { // isolated from the other transaction related RPC calls. this.heartbeatMSClient = getMetaStoreClient(conf, metastoreUri, secureMode, "streaming-connection-heartbeat"); validateTable(); + + // disable fs cache to avoid filesystem being closed when record writer is flushing/closing + String location = tableObject.getSd().getLocation(); + String scheme; + try { + URI uri = new URI(location); + scheme = uri.getScheme(); + } catch (URISyntaxException e) { + throw new StreamingException("Unable to determine filesystem scheme from table location: " + location, e); + } + setHiveConf(conf, String.format("fs.%s.impl.disable.cache", scheme)); + LOG.info("STREAMING CONNECTION INFO: {}", toConnectionInfoString()); } @@ -326,7 +341,10 @@ public HiveStreamingConnection connect() throws StreamingException { if (recordWriter == null) { throw new StreamingException("Record writer cannot be null for streaming connection"); } - return new HiveStreamingConnection(this); + HiveStreamingConnection streamingConnection = new HiveStreamingConnection(this); + ShutdownHookManager.addShutdownHook(streamingConnection::close); + Thread.setDefaultUncaughtExceptionHandler((t, e) -> streamingConnection.close()); + return streamingConnection; } } @@ -539,7 +557,7 @@ public void close() { currentTransactionBatch.close(); } } catch (StreamingException e) { - LOG.error("Unable to close current transaction batch: " + currentTransactionBatch, e); + LOG.warn("Unable to close current transaction batch: " + currentTransactionBatch, e); } finally { getMSC().close(); getHeatbeatMSC().close(); @@ -818,21 +836,11 @@ private void checkIsClosed() throws StreamingException { * with the write, we can't continue to write to the same file any as it may be corrupted now (at the tail). * This ensures that a client can't ignore these failures and continue to write. */ - private void markDead(boolean success) { + private void markDead(boolean success) throws StreamingException { if (success) { return; } - isTxnClosed.set(true); //also ensures that heartbeat() is no-op since client is likely doing it async - try { - abort(true);//abort all remaining txns - } catch (Exception ex) { - LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex); - } - try { - closeImpl(); - } catch (Exception ex) { - LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex); - } + close(); } @@ -957,9 +965,19 @@ public void close() throws StreamingException { if (isTxnClosed.get()) { return; } - isTxnClosed.set(true); - abortImpl(true); - closeImpl(); + isTxnClosed.set(true); //also ensures that heartbeat() is no-op since client is likely doing it async + try { + abort(true);//abort all remaining txns + } catch (Exception ex) { + LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex); + throw new StreamingException("Unable to abort", ex); + } + try { + closeImpl(); + } catch (Exception ex) { + LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex); + throw new StreamingException("Unable to close", ex); + } } private void closeImpl() throws StreamingException {