diff --git a/src/java/org/apache/hadoop/hbase/io/hfile/Compression.java b/src/java/org/apache/hadoop/hbase/io/hfile/Compression.java index 29da283..91346dc 100644 --- a/src/java/org/apache/hadoop/hbase/io/hfile/Compression.java +++ b/src/java/org/apache/hadoop/hbase/io/hfile/Compression.java @@ -33,6 +33,7 @@ import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.compress.DefaultCodec; /** * Compression related stuff. @@ -72,25 +73,35 @@ public final class Compression { */ public static enum Algorithm { LZO("lzo") { + // Use base type to avoid compile-time dependencies. + private DefaultCodec lzoCodec; @Override - CompressionCodec getCodec() { - throw new UnsupportedOperationException("LZO compression is disabled for now"); - } - @Override - public InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException { - throw new UnsupportedOperationException("LZO compression is disabled for now"); - } - @Override - public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException { - throw new UnsupportedOperationException("LZO compression is disabled for now"); + DefaultCodec getCodec() { + if (lzoCodec == null) { + Configuration conf = new Configuration(); + conf.setBoolean("hadoop.native.lib", true); + try { + Class externalCodec = + ClassLoader.getSystemClassLoader().loadClass("com.hadoop.compression.lzo.LzoCodec"); + lzoCodec = (DefaultCodec) externalCodec.newInstance(); + lzoCodec.setConf(conf); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } catch (InstantiationException e) { + throw new RuntimeException(e); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + return lzoCodec; } }, GZ("gz") { private GzipCodec codec; @Override - CompressionCodec getCodec() { + DefaultCodec getCodec() { if (codec == null) { Configuration conf = new Configuration(); conf.setBoolean("hadoop.native.lib", true); @@ -100,45 +111,11 @@ public final class Compression { return codec; } - - @Override - public synchronized InputStream createDecompressionStream( - InputStream downStream, Decompressor decompressor, - int downStreamBufferSize) throws IOException { - // Set the internal buffer size to read from down stream. - if (downStreamBufferSize > 0) { - codec.getConf().setInt("io.file.buffer.size", downStreamBufferSize); - } - CompressionInputStream cis = - codec.createInputStream(downStream, decompressor); - BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE); - return bis2; - } - - @Override - public synchronized OutputStream createCompressionStream( - OutputStream downStream, Compressor compressor, - int downStreamBufferSize) throws IOException { - OutputStream bos1 = null; - if (downStreamBufferSize > 0) { - bos1 = new BufferedOutputStream(downStream, downStreamBufferSize); - } - else { - bos1 = downStream; - } - codec.getConf().setInt("io.file.buffer.size", 32 * 1024); - CompressionOutputStream cos = - codec.createOutputStream(bos1, compressor); - BufferedOutputStream bos2 = - new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), - DATA_OBUF_SIZE); - return bos2; - } }, NONE("none") { @Override - CompressionCodec getCodec() { + DefaultCodec getCodec() { return null; } @@ -179,15 +156,42 @@ public final class Compression { this.compressName = name; } - abstract CompressionCodec getCodec(); + abstract DefaultCodec getCodec(); - public abstract InputStream createDecompressionStream( + public InputStream createDecompressionStream( InputStream downStream, Decompressor decompressor, - int downStreamBufferSize) throws IOException; + int downStreamBufferSize) throws IOException { + DefaultCodec codec = getCodec(); + // Set the internal buffer size to read from down stream. + if (downStreamBufferSize > 0) { + codec.getConf().setInt("io.file.buffer.size", downStreamBufferSize); + } + CompressionInputStream cis = + codec.createInputStream(downStream, decompressor); + BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE); + return bis2; + + } - public abstract OutputStream createCompressionStream( + public OutputStream createCompressionStream( OutputStream downStream, Compressor compressor, int downStreamBufferSize) - throws IOException; + throws IOException { + DefaultCodec codec = getCodec(); + OutputStream bos1 = null; + if (downStreamBufferSize > 0) { + bos1 = new BufferedOutputStream(downStream, downStreamBufferSize); + } + else { + bos1 = downStream; + } + codec.getConf().setInt("io.file.buffer.size", 32 * 1024); + CompressionOutputStream cos = + codec.createOutputStream(bos1, compressor); + BufferedOutputStream bos2 = + new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), + DATA_OBUF_SIZE); + return bos2; + } public Compressor getCompressor() { CompressionCodec codec = getCodec();