diff --git common/src/java/org/apache/hadoop/hive/common/ValidTxnListImpl.java common/src/java/org/apache/hadoop/hive/common/ValidTxnListImpl.java index 26528f9..0a0e5d0 100644 --- common/src/java/org/apache/hadoop/hive/common/ValidTxnListImpl.java +++ common/src/java/org/apache/hadoop/hive/common/ValidTxnListImpl.java @@ -18,10 +18,23 @@ package org.apache.hadoop.hive.common; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.util.Arrays; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; public class ValidTxnListImpl implements ValidTxnList { + static final private Log LOG = LogFactory.getLog(ValidTxnListImpl.class.getName()); + final static private int MAX_UNCOMPRESSED_LENGTH = 256; + final static private char COMPRESSION_MARKER = 'C'; + final static private String STRING_ENCODING = "ISO-8859-1"; + private long[] exceptions; private long highWatermark; @@ -95,7 +108,25 @@ public String writeToString() { buf.append(except); } } - return buf.toString(); + if (buf.length() > MAX_UNCOMPRESSED_LENGTH) { + try { + ByteArrayOutputStream byteBuf = new ByteArrayOutputStream(); + GZIPOutputStream gzip = new GZIPOutputStream(byteBuf); + gzip.write(buf.toString().getBytes()); + gzip.close(); + StringBuilder buf2 = new StringBuilder(); + buf2.append(COMPRESSION_MARKER); + buf2.append(buf.length()); + buf2.append(':'); + buf2.append(byteBuf.toString(STRING_ENCODING)); + return buf2.toString(); + } catch (IOException e) { + LOG.error("Unable to compress transaction list, " + e.getMessage()); + throw new RuntimeException(e); + } + } else { + return buf.toString(); + } } @Override @@ -104,11 +135,36 @@ public void readFromString(String src) { highWatermark = Long.MAX_VALUE; exceptions = new long[0]; } else { - String[] values = src.split(":"); + String[] values; + if (src.charAt(0) == COMPRESSION_MARKER) { + try { + int colon = src.indexOf(':'); + int len = Integer.valueOf(src.substring(1, colon)); + ByteArrayInputStream byteBuf = + new ByteArrayInputStream(src.substring(colon + 1).getBytes(STRING_ENCODING)); + GZIPInputStream gzip = new GZIPInputStream(byteBuf); + byte[] buf = new byte[len]; + int bytesRead = 0; + int offset = 0; + int maxReadLen = len; + do { + bytesRead = gzip.read(buf, offset, maxReadLen); + offset += bytesRead; + maxReadLen -= bytesRead; + } while (maxReadLen > 0); + values = new String(buf).split(":"); + } catch (IOException e) { + LOG.error("Unable to decode compressed transaction list, " + e.getMessage()); + throw new RuntimeException(e); + } + + } else { + values = src.split(":"); + } highWatermark = Long.parseLong(values[0]); exceptions = new long[values.length - 1]; - for(int i = 1; i < values.length; ++i) { - exceptions[i-1] = Long.parseLong(values[i]); + for (int i = 1; i < values.length; ++i) { + exceptions[i - 1] = Long.parseLong(values[i]); } } } diff --git common/src/test/org/apache/hadoop/hive/common/TestValidTxnImpl.java common/src/test/org/apache/hadoop/hive/common/TestValidTxnImpl.java new file mode 100644 index 0000000..73ba023 --- /dev/null +++ common/src/test/org/apache/hadoop/hive/common/TestValidTxnImpl.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common; + +import junit.framework.Assert; +import org.junit.Test; + +/** + * Tests for {@link org.apache.hadoop.hive.common.ValidTxnListImpl} + */ +public class TestValidTxnImpl { + + @Test + public void noExceptions() throws Exception { + ValidTxnList txnList = new ValidTxnListImpl(new long[0], 1); + String str = txnList.writeToString(); + Assert.assertEquals("1:", str); + ValidTxnList newList = new ValidTxnListImpl(); + newList.readFromString(str); + Assert.assertTrue(newList.isTxnCommitted(1)); + Assert.assertFalse(newList.isTxnCommitted(2)); + } + + @Test + public void exceptions() throws Exception { + ValidTxnList txnList = new ValidTxnListImpl(new long[]{2L,4L}, 5); + String str = txnList.writeToString(); + Assert.assertEquals("5:2:4", str); + ValidTxnList newList = new ValidTxnListImpl(); + newList.readFromString(str); + Assert.assertTrue(newList.isTxnCommitted(1)); + Assert.assertFalse(newList.isTxnCommitted(2)); + Assert.assertTrue(newList.isTxnCommitted(3)); + Assert.assertFalse(newList.isTxnCommitted(4)); + Assert.assertTrue(newList.isTxnCommitted(5)); + Assert.assertFalse(newList.isTxnCommitted(6)); + } + + @Test + public void longEnoughToCompress() throws Exception { + long[] exceptions = new long[1000]; + for (int i = 0; i < 1000; i++) exceptions[i] = i + 100; + ValidTxnList txnList = new ValidTxnListImpl(exceptions, 2000); + String str = txnList.writeToString(); + Assert.assertEquals('C', str.charAt(0)); + ValidTxnList newList = new ValidTxnListImpl(); + newList.readFromString(str); + for (int i = 0; i < 100; i++) Assert.assertTrue(newList.isTxnCommitted(i)); + for (int i = 100; i < 1100; i++) Assert.assertFalse(newList.isTxnCommitted(i)); + for (int i = 1100; i < 2001; i++) Assert.assertTrue(newList.isTxnCommitted(i)); + Assert.assertFalse(newList.isTxnCommitted(2001)); + } +}