commit 299e4c1013734855bf5cc2651880e5a78c0eac6a
Author: Lee Dongjin <dongjin@apache.org>
Date:   Fri Jun 21 03:03:23 2019 +0900

    Add GZip[Input, Output]Stream, which calls System#runFinalization on close to avoid a surge of Inflater/Deflater

diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 352d12d83..0085755b0 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -28,8 +28,6 @@ import java.lang.invoke.MethodHandle;
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.MethodType;
 import java.nio.ByteBuffer;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
 
 /**
  * The compression type to use
@@ -54,7 +52,7 @@ public enum CompressionType {
                 // Set input buffer (uncompressed) to 16 KB (none by default) and output buffer (compressed) to
                 // 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller passes a small
                 // number of bytes to write (potentially a single byte)
-                return new BufferedOutputStream(new GZIPOutputStream(buffer, 8 * 1024), 16 * 1024);
+                return new BufferedOutputStream(new GZipOutputStream(buffer, 8 * 1024), 16 * 1024);
             } catch (Exception e) {
                 throw new KafkaException(e);
             }
@@ -66,7 +64,7 @@ public enum CompressionType {
                 // Set output buffer (uncompressed) to 16 KB (none by default) and input buffer (compressed) to
                 // 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller reads a small
                 // number of bytes (potentially a single byte)
-                return new BufferedInputStream(new GZIPInputStream(new ByteBufferInputStream(buffer), 8 * 1024),
+                return new BufferedInputStream(new GZipInputStream(new ByteBufferInputStream(buffer), 8 * 1024),
                         16 * 1024);
             } catch (Exception e) {
                 throw new KafkaException(e);
diff --git a/clients/src/main/java/org/apache/kafka/common/record/GZipInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/GZipInputStream.java
new file mode 100644
index 000000000..97b5942d7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/GZipInputStream.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.Inflater;
+import java.util.zip.GZIPInputStream;
+
+/**
+ * An extension of {@link GZIPInputStream}, which calls System#runFinalization on close to avoid a surge of {@link Inflater}.
+ */
+public class GZipInputStream extends GZIPInputStream {
+    /**
+     * Creates a new {@link InputStream} with the specified buffer size.
+     *
+     * @param in the input stream
+     * @param size the input buffer size
+     * @exception IOException if an I/O error has occurred
+     * @exception IllegalArgumentException if {@code size <= 0}
+     */
+    public GZipInputStream(InputStream in, int size) throws IOException {
+        super(in, size);
+    }
+
+    /**
+     * Closes this input stream,  releases any system resources associated with the stream, and call {@link System#runFinalization()}
+     * to avoid a surge of {@link Inflater}.
+     *
+     * @exception IOException if an I/O error has occurred
+     */
+    @Override
+    public void close() throws IOException {
+        super.close();
+        System.runFinalization();
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/GZipOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/GZipOutputStream.java
new file mode 100644
index 000000000..955005a37
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/GZipOutputStream.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.Deflater;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * An extension of {@link GZIPOutputStream}, which calls System#runFinalization on close to avoid a surge of {@link Deflater}.
+ */
+public class GZipOutputStream extends GZIPOutputStream {
+    /**
+     * Creates a new {@link OutputStream} with the specified buffer size.
+     *
+     * @param out   the output stream
+     * @param size  the output buffer size
+     * @throws IOException If an I/O error has occurred.
+     */
+    GZipOutputStream(OutputStream out, int size) throws IOException {
+        super(out, size);
+    }
+
+    /**
+     * Writes remaining compressed data to the output stream, closes the underlying stream, and call {@link System#runFinalization()}
+     * to avoid a surge of {@link Deflater}.
+     *
+     * @exception IOException if an I/O error has occurred
+     */
+    @Override
+    public void close() throws IOException {
+        super.close();
+        System.runFinalization();
+    }
+}
