diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java index 2980028..9e90d36 100644 --- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -142,7 +142,9 @@ public void init(StreamingConnection conn, long minWriteId, long maxWriteId) thr try { URI uri = new URI(location); this.fs = FileSystem.newInstance(uri, conf); - LOG.info("Created new filesystem instance: {}", System.identityHashCode(this.fs)); + if (LOG.isDebugEnabled()) { + LOG.debug("Created new filesystem instance: {}", System.identityHashCode(this.fs)); + } } catch (URISyntaxException e) { throw new StreamingException("Unable to create URI from location: " + location, e); } catch (IOException e) { @@ -197,7 +199,7 @@ protected void setupMemoryMonitoring() { this.autoFlush = conf.getBoolVar(HiveConf.ConfVars.HIVE_STREAMING_AUTO_FLUSH_ENABLED); this.memoryUsageThreshold = conf.getFloatVar(HiveConf.ConfVars.HIVE_HEAP_MEMORY_MONITOR_USAGE_THRESHOLD); this.ingestSizeThreshold = conf.getSizeVar(HiveConf.ConfVars.HIVE_STREAMING_AUTO_FLUSH_CHECK_INTERVAL_SIZE); - LOG.info("Memory monitorings settings - autoFlush: {} memoryUsageThreshold: {} ingestSizeThreshold: {}", + LOG.info("Memory monitoring settings - autoFlush: {} memoryUsageThreshold: {} ingestSizeThreshold: {}", autoFlush, memoryUsageThreshold, ingestSizeBytes); this.heapMemoryMonitor = new HeapMemoryMonitor(memoryUsageThreshold); MemoryUsage tenuredMemUsage = heapMemoryMonitor.getTenuredGenMemoryUsage(); @@ -329,9 +331,13 @@ protected int getBucket(Object row) { @Override public void flush() throws StreamingIOFailure { try { - logStats("Stats before flush:"); + if (LOG.isDebugEnabled()) { + logStats("Stats before flush:"); + } for (Map.Entry> entry : updaters.entrySet()) { - LOG.info("Flushing record updater for partitions: {}", entry.getKey()); + if (LOG.isDebugEnabled()) { + LOG.debug("Flushing record updater for partitions: {}", entry.getKey()); + } for (RecordUpdater updater : entry.getValue()) { if (updater != null) { updater.flush(); @@ -339,7 +345,9 @@ public void flush() throws StreamingIOFailure { } } ingestSizeBytes = 0; - logStats("Stats after flush:"); + if (LOG.isDebugEnabled()) { + logStats("Stats after flush:"); + } } catch (IOException e) { throw new StreamingIOFailure("Unable to flush recordUpdater", e); } @@ -349,10 +357,14 @@ public void flush() throws StreamingIOFailure { public void close() throws StreamingIOFailure { boolean haveError = false; String partition = null; - logStats("Stats before close:"); + if (LOG.isDebugEnabled()) { + logStats("Stats before close:"); + } for (Map.Entry> entry : updaters.entrySet()) { partition = entry.getKey(); - LOG.info("Closing updater for partitions: {}", partition); + if (LOG.isDebugEnabled()) { + LOG.debug("Closing updater for partitions: {}", partition); + } for (RecordUpdater updater : entry.getValue()) { if (updater != null) { try { @@ -367,7 +379,9 @@ public void close() throws StreamingIOFailure { entry.getValue().clear(); } updaters.clear(); - logStats("Stats after close:"); + if (LOG.isDebugEnabled()) { + logStats("Stats after close:"); + } if (haveError) { throw new StreamingIOFailure("Encountered errors while closing (see logs) " + getWatermark(partition)); } @@ -432,8 +446,10 @@ protected void checkAutoFlush() throws StreamingIOFailure { } if (lowMemoryCanary != null) { if (lowMemoryCanary.get() && ingestSizeBytes > ingestSizeThreshold) { - LOG.info("Low memory canary is set and ingestion size (buffered) threshold '{}' exceeded. " + - "Flushing all record updaters..", LlapUtil.humanReadableByteCount(ingestSizeThreshold)); + if (LOG.isDebugEnabled()) { + LOG.debug("Low memory canary is set and ingestion size (buffered) threshold '{}' exceeded. " + + "Flushing all record updaters..", LlapUtil.humanReadableByteCount(ingestSizeThreshold)); + } flush(); conn.getConnectionStats().incrementAutoFlushCount(); lowMemoryCanary.set(false); @@ -444,8 +460,10 @@ protected void checkAutoFlush() throws StreamingIOFailure { MemoryUsage heapUsage = mxBean.getHeapMemoryUsage(); float memUsedFraction = ((float) heapUsage.getUsed() / (float) heapUsage.getMax()); if (memUsedFraction > memoryUsageThreshold) { - LOG.info("Memory usage threshold '{}' and ingestion size (buffered) threshold '{}' exceeded. " + - "Flushing all record updaters..", memUsedFraction, LlapUtil.humanReadableByteCount(ingestSizeThreshold)); + if (LOG.isDebugEnabled()) { + LOG.info("Memory usage threshold '{}' and ingestion size (buffered) threshold '{}' exceeded. " + + "Flushing all record updaters..", memUsedFraction, LlapUtil.humanReadableByteCount(ingestSizeThreshold)); + } flush(); conn.getConnectionStats().incrementAutoFlushCount(); } @@ -498,9 +516,13 @@ protected RecordUpdater getRecordUpdater(List partitionValues, int bucke // partitions to TxnHandler if (!partitionInfo.isExists()) { addedPartitions.add(partitionInfo.getName()); - LOG.info("Created partition {} for table {}", partitionInfo.getName(), fullyQualifiedTableName); + if (LOG.isDebugEnabled()) { + LOG.debug("Created partition {} for table {}", partitionInfo.getName(), fullyQualifiedTableName); + } } else { - LOG.info("Partition {} already exists for table {}", partitionInfo.getName(), fullyQualifiedTableName); + if (LOG.isDebugEnabled()) { + LOG.debug("Partition {} already exists for table {}", partitionInfo.getName(), fullyQualifiedTableName); + } } destLocation = new Path(partitionInfo.getPartitionLocation()); } @@ -550,7 +572,7 @@ protected void logStats(final String prefix) { oldGenUsage = "used/max => " + LlapUtil.humanReadableByteCount(memoryUsage.getUsed()) + "/" + LlapUtil.humanReadableByteCount(memoryUsage.getMax()); } - LOG.info("{} [record-updaters: {}, partitions: {}, buffered-records: {} total-records: {} " + + LOG.debug("{} [record-updaters: {}, partitions: {}, buffered-records: {} total-records: {} " + "buffered-ingest-size: {}, total-ingest-size: {} tenured-memory-usage: {}]", prefix, openRecordUpdaters, partitionPaths.size(), bufferedRecords, conn.getConnectionStats().getRecordsWritten(), LlapUtil.humanReadableByteCount(ingestSizeBytes), diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index 7adbadd..6cf14b0 100644 --- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -552,7 +552,9 @@ public void close() { getMSC().close(); getHeatbeatMSC().close(); } - LOG.info("Closed streaming connection. Agent: {} Stats: {}", getAgentInfo(), getConnectionStats()); + if (LOG.isInfoEnabled()) { + LOG.info("Closed streaming connection. Agent: {} Stats: {}", getAgentInfo(), getConnectionStats()); + } } @Override