Index: storage-drivers/hbase/build.xml =================================================================== --- storage-drivers/hbase/build.xml (revision 1211608) +++ storage-drivers/hbase/build.xml (working copy) @@ -35,7 +35,6 @@ - @@ -65,6 +64,8 @@ + + @@ -110,6 +111,8 @@ + + @@ -125,6 +128,31 @@ includes="hadoop-test-0.20.3-CDH3-SNAPSHOT.jar"/> + + + + + + + + + + + + + + + + + + + + + + + + - + + - + + @@ -269,6 +300,27 @@ + + + + + + + + + + + + + + + + + + + Tests failed! @@ -325,8 +377,10 @@ + + Index: storage-drivers/hbase/ivy.xml =================================================================== --- storage-drivers/hbase/ivy.xml (revision 1211608) +++ storage-drivers/hbase/ivy.xml (working copy) @@ -41,6 +41,11 @@ - + + + + + Index: storage-drivers/hbase/ivy/libraries.properties =================================================================== --- storage-drivers/hbase/ivy/libraries.properties (revision 1211608) +++ storage-drivers/hbase/ivy/libraries.properties (working copy) @@ -17,4 +17,5 @@ ivy.version=2.2.0 rats-lib.version=0.5.1 hbase.version=0.90.3 -zookeeper.version=3.3.1 +zookeeper.version=3.4.0 +thrift.version=0.7.0 Index: storage-drivers/hbase/if/transaction.thrift =================================================================== --- storage-drivers/hbase/if/transaction.thrift (revision 0) +++ storage-drivers/hbase/if/transaction.thrift (revision 0) @@ -0,0 +1,11 @@ +namespace java org.apache.hcatalog.hbase.snapshot.transaction.thrift +namespace cpp Apache.HCatalog.HBase + +struct StoreFamilyRevision { + 1: i64 revision, + 2: i64 timestamp +} + +struct StoreFamilyRevisionList { + 1: list revisionList +} Index: storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevision.java =================================================================== --- storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevision.java (revision 0) +++ storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevision.java (revision 0) @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This class is used to store the revision and timestamp of a column family + * in a transaction. + * + */ +/** + * Autogenerated by Thrift Compiler (0.7.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + */ +package org.apache.hcatalog.hbase.snapshot.transaction.thrift; + +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.EnumMap; +import java.util.Set; +import java.util.HashSet; +import java.util.EnumSet; +import java.util.Collections; +import java.util.BitSet; +import java.nio.ByteBuffer; +import java.util.Arrays; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StoreFamilyRevision implements org.apache.thrift.TBase, java.io.Serializable, Cloneable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("StoreFamilyRevision"); + + private static final org.apache.thrift.protocol.TField REVISION_FIELD_DESC = new org.apache.thrift.protocol.TField("revision", org.apache.thrift.protocol.TType.I64, (short)1); + private static final org.apache.thrift.protocol.TField TIMESTAMP_FIELD_DESC = new org.apache.thrift.protocol.TField("timestamp", org.apache.thrift.protocol.TType.I64, (short)2); + + public long revision; // required + public long timestamp; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + REVISION((short)1, "revision"), + TIMESTAMP((short)2, "timestamp"); + + private static final Map byName = new HashMap(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // REVISION + return REVISION; + case 2: // TIMESTAMP + return TIMESTAMP; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + private static final int __REVISION_ISSET_ID = 0; + private static final int __TIMESTAMP_ISSET_ID = 1; + private BitSet __isset_bit_vector = new BitSet(2); + + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.REVISION, new org.apache.thrift.meta_data.FieldMetaData("revision", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + tmpMap.put(_Fields.TIMESTAMP, new org.apache.thrift.meta_data.FieldMetaData("timestamp", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(StoreFamilyRevision.class, metaDataMap); + } + + public StoreFamilyRevision() { + } + + public StoreFamilyRevision( + long revision, + long timestamp) + { + this(); + this.revision = revision; + setRevisionIsSet(true); + this.timestamp = timestamp; + setTimestampIsSet(true); + } + + /** + * Performs a deep copy on other. + */ + public StoreFamilyRevision(StoreFamilyRevision other) { + __isset_bit_vector.clear(); + __isset_bit_vector.or(other.__isset_bit_vector); + this.revision = other.revision; + this.timestamp = other.timestamp; + } + + public StoreFamilyRevision deepCopy() { + return new StoreFamilyRevision(this); + } + + @Override + public void clear() { + setRevisionIsSet(false); + this.revision = 0; + setTimestampIsSet(false); + this.timestamp = 0; + } + + public long getRevision() { + return this.revision; + } + + public StoreFamilyRevision setRevision(long revision) { + this.revision = revision; + setRevisionIsSet(true); + return this; + } + + public void unsetRevision() { + __isset_bit_vector.clear(__REVISION_ISSET_ID); + } + + /** Returns true if field revision is set (has been assigned a value) and false otherwise */ + public boolean isSetRevision() { + return __isset_bit_vector.get(__REVISION_ISSET_ID); + } + + public void setRevisionIsSet(boolean value) { + __isset_bit_vector.set(__REVISION_ISSET_ID, value); + } + + public long getTimestamp() { + return this.timestamp; + } + + public StoreFamilyRevision setTimestamp(long timestamp) { + this.timestamp = timestamp; + setTimestampIsSet(true); + return this; + } + + public void unsetTimestamp() { + __isset_bit_vector.clear(__TIMESTAMP_ISSET_ID); + } + + /** Returns true if field timestamp is set (has been assigned a value) and false otherwise */ + public boolean isSetTimestamp() { + return __isset_bit_vector.get(__TIMESTAMP_ISSET_ID); + } + + public void setTimestampIsSet(boolean value) { + __isset_bit_vector.set(__TIMESTAMP_ISSET_ID, value); + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case REVISION: + if (value == null) { + unsetRevision(); + } else { + setRevision((Long)value); + } + break; + + case TIMESTAMP: + if (value == null) { + unsetTimestamp(); + } else { + setTimestamp((Long)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case REVISION: + return Long.valueOf(getRevision()); + + case TIMESTAMP: + return Long.valueOf(getTimestamp()); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case REVISION: + return isSetRevision(); + case TIMESTAMP: + return isSetTimestamp(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof StoreFamilyRevision) + return this.equals((StoreFamilyRevision)that); + return false; + } + + public boolean equals(StoreFamilyRevision that) { + if (that == null) + return false; + + boolean this_present_revision = true; + boolean that_present_revision = true; + if (this_present_revision || that_present_revision) { + if (!(this_present_revision && that_present_revision)) + return false; + if (this.revision != that.revision) + return false; + } + + boolean this_present_timestamp = true; + boolean that_present_timestamp = true; + if (this_present_timestamp || that_present_timestamp) { + if (!(this_present_timestamp && that_present_timestamp)) + return false; + if (this.timestamp != that.timestamp) + return false; + } + + return true; + } + + @Override + public int hashCode() { + return 0; + } + + public int compareTo(StoreFamilyRevision other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + StoreFamilyRevision typedOther = (StoreFamilyRevision)other; + + lastComparison = Boolean.valueOf(isSetRevision()).compareTo(typedOther.isSetRevision()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetRevision()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.revision, typedOther.revision); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetTimestamp()).compareTo(typedOther.isSetTimestamp()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetTimestamp()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.timestamp, typedOther.timestamp); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField field; + iprot.readStructBegin(); + while (true) + { + field = iprot.readFieldBegin(); + if (field.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (field.id) { + case 1: // REVISION + if (field.type == org.apache.thrift.protocol.TType.I64) { + this.revision = iprot.readI64(); + setRevisionIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + case 2: // TIMESTAMP + if (field.type == org.apache.thrift.protocol.TType.I64) { + this.timestamp = iprot.readI64(); + setTimestampIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + validate(); + + oprot.writeStructBegin(STRUCT_DESC); + oprot.writeFieldBegin(REVISION_FIELD_DESC); + oprot.writeI64(this.revision); + oprot.writeFieldEnd(); + oprot.writeFieldBegin(TIMESTAMP_FIELD_DESC); + oprot.writeI64(this.timestamp); + oprot.writeFieldEnd(); + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("StoreFamilyRevision("); + boolean first = true; + + sb.append("revision:"); + sb.append(this.revision); + first = false; + if (!first) sb.append(", "); + sb.append("timestamp:"); + sb.append(this.timestamp); + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bit_vector = new BitSet(1); + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + +} + Index: storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevisionList.java =================================================================== --- storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevisionList.java (revision 0) +++ storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevisionList.java (revision 0) @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This class is used to store a list of StoreFamilyRevision for a column + * family in zookeeper. + * + */ +/** + * Autogenerated by Thrift Compiler (0.7.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + */ +package org.apache.hcatalog.hbase.snapshot.transaction.thrift; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumMap; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class StoreFamilyRevisionList implements org.apache.thrift.TBase, java.io.Serializable, Cloneable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("StoreFamilyRevisionList"); + + private static final org.apache.thrift.protocol.TField REVISION_LIST_FIELD_DESC = new org.apache.thrift.protocol.TField("revisionList", org.apache.thrift.protocol.TType.LIST, (short)1); + + public List revisionList; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + REVISION_LIST((short)1, "revisionList"); + + private static final Map byName = new HashMap(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // REVISION_LIST + return REVISION_LIST; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.REVISION_LIST, new org.apache.thrift.meta_data.FieldMetaData("revisionList", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, StoreFamilyRevision.class)))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(StoreFamilyRevisionList.class, metaDataMap); + } + + public StoreFamilyRevisionList() { + } + + public StoreFamilyRevisionList( + List revisionList) + { + this(); + this.revisionList = revisionList; + } + + /** + * Performs a deep copy on other. + */ + public StoreFamilyRevisionList(StoreFamilyRevisionList other) { + if (other.isSetRevisionList()) { + List __this__revisionList = new ArrayList(); + for (StoreFamilyRevision other_element : other.revisionList) { + __this__revisionList.add(new StoreFamilyRevision(other_element)); + } + this.revisionList = __this__revisionList; + } + } + + public StoreFamilyRevisionList deepCopy() { + return new StoreFamilyRevisionList(this); + } + + @Override + public void clear() { + this.revisionList = null; + } + + public int getRevisionListSize() { + return (this.revisionList == null) ? 0 : this.revisionList.size(); + } + + public java.util.Iterator getRevisionListIterator() { + return (this.revisionList == null) ? null : this.revisionList.iterator(); + } + + public void addToRevisionList(StoreFamilyRevision elem) { + if (this.revisionList == null) { + this.revisionList = new ArrayList(); + } + this.revisionList.add(elem); + } + + public List getRevisionList() { + return this.revisionList; + } + + public StoreFamilyRevisionList setRevisionList(List revisionList) { + this.revisionList = revisionList; + return this; + } + + public void unsetRevisionList() { + this.revisionList = null; + } + + /** Returns true if field revisionList is set (has been assigned a value) and false otherwise */ + public boolean isSetRevisionList() { + return this.revisionList != null; + } + + public void setRevisionListIsSet(boolean value) { + if (!value) { + this.revisionList = null; + } + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case REVISION_LIST: + if (value == null) { + unsetRevisionList(); + } else { + setRevisionList((List)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case REVISION_LIST: + return getRevisionList(); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case REVISION_LIST: + return isSetRevisionList(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof StoreFamilyRevisionList) + return this.equals((StoreFamilyRevisionList)that); + return false; + } + + public boolean equals(StoreFamilyRevisionList that) { + if (that == null) + return false; + + boolean this_present_revisionList = true && this.isSetRevisionList(); + boolean that_present_revisionList = true && that.isSetRevisionList(); + if (this_present_revisionList || that_present_revisionList) { + if (!(this_present_revisionList && that_present_revisionList)) + return false; + if (!this.revisionList.equals(that.revisionList)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + return 0; + } + + public int compareTo(StoreFamilyRevisionList other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + StoreFamilyRevisionList typedOther = (StoreFamilyRevisionList)other; + + lastComparison = Boolean.valueOf(isSetRevisionList()).compareTo(typedOther.isSetRevisionList()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetRevisionList()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.revisionList, typedOther.revisionList); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField field; + iprot.readStructBegin(); + while (true) + { + field = iprot.readFieldBegin(); + if (field.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (field.id) { + case 1: // REVISION_LIST + if (field.type == org.apache.thrift.protocol.TType.LIST) { + { + org.apache.thrift.protocol.TList _list0 = iprot.readListBegin(); + this.revisionList = new ArrayList(_list0.size); + for (int _i1 = 0; _i1 < _list0.size; ++_i1) + { + StoreFamilyRevision _elem2; // required + _elem2 = new StoreFamilyRevision(); + _elem2.read(iprot); + this.revisionList.add(_elem2); + } + iprot.readListEnd(); + } + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (this.revisionList != null) { + oprot.writeFieldBegin(REVISION_LIST_FIELD_DESC); + { + oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, this.revisionList.size())); + for (StoreFamilyRevision _iter3 : this.revisionList) + { + _iter3.write(oprot); + } + oprot.writeListEnd(); + } + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("StoreFamilyRevisionList("); + boolean first = true; + + sb.append("revisionList:"); + if (this.revisionList == null) { + sb.append("null"); + } else { + sb.append(this.revisionList); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + +} + Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java =================================================================== --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java (revision 0) +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java (revision 0) @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase.snapshot; + + +/** + * A FamiliyRevision class consists of a revision number and a expiration + * timestamp. When a write transaction starts, the transaction + * object is appended to the transaction list of the each column + * family and stored in the corresponding znode. When a write transaction is + * committed, the transaction object is removed from the list. + */ +class FamilyRevision implements + Comparable { + + private long revision; + + private long timestamp; + + /** + * Create a FamilyRevision object + * @param rev revision number + * @param ts expiration timestamp + */ + FamilyRevision(long rev, long ts) { + this.revision = rev; + this.timestamp = ts; + } + + long getRevision() { + return revision; + } + + long getExpireTimestamp() { + return timestamp; + } + + void setExpireTimestamp(long ts) { + timestamp = ts; + } + + @Override + public String toString() { + String description = "revision: " + revision + " ts: " + timestamp; + return description; + } + + @Override + public int compareTo(FamilyRevision o) { + long d = revision - o.getRevision(); + return (d < 0) ? -1 : (d > 0) ? 1 : 0; + } + + +} Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/IDGenerator.java =================================================================== --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/IDGenerator.java (revision 0) +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/IDGenerator.java (revision 0) @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase.snapshot; +import java.io.IOException; +import java.nio.charset.Charset; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hcatalog.hbase.snapshot.lock.LockListener; +import org.apache.hcatalog.hbase.snapshot.lock.WriteLock; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; + + +/** + * This class generates revision id's for transactions. + */ +class IDGenerator implements LockListener{ + + private ZooKeeper zookeeper; + private String zNodeDataLoc; + private String zNodeLockBasePath; + private long id; + private static final Log LOG = LogFactory.getLog(IDGenerator.class); + + IDGenerator(ZooKeeper zookeeper, String tableName, String idGenNode) + throws IOException { + this.zookeeper = zookeeper; + this.zNodeDataLoc = idGenNode; + this.zNodeLockBasePath = PathUtil.getLockManagementNode(idGenNode); + } + + /** + * This method obtains a revision id for a transaction. + * + * @return revision ID + * @throws IOException + */ + public long obtainID() throws IOException{ + WriteLock wLock = new WriteLock(zookeeper, zNodeLockBasePath, Ids.OPEN_ACL_UNSAFE); + wLock.setLockListener(this); + try { + boolean lockGrabbed = wLock.lock(); + if (lockGrabbed == false) { + //TO DO : Let this request queue up and try obtaining lock. + throw new IOException("Unable to obtain lock to obtain id."); + } else { + id = incrementAndReadCounter(); + } + } catch (KeeperException e) { + LOG.warn("Exception while obtaining lock for ID.", e); + throw new IOException("Exception while obtaining lock for ID.", e); + } catch (InterruptedException e) { + LOG.warn("Exception while obtaining lock for ID.", e); + throw new IOException("Exception while obtaining lock for ID.", e); + } finally { + wLock.unlock(); + } + return id; + } + + /** + * This method reads the latest revision ID that has been used. The ID + * returned by this method cannot be used for transaction. + * @return revision ID + * @throws IOException + */ + public long readID() throws IOException{ + long curId; + try { + Stat stat = new Stat(); + byte[] data = zookeeper.getData(this.zNodeDataLoc, false, stat); + curId = Long.parseLong(new String(data,Charset.forName("UTF-8"))); + } catch (KeeperException e) { + LOG.warn("Exception while reading current revision id.", e); + throw new IOException("Exception while reading current revision id.", e); + } catch (InterruptedException e) { + LOG.warn("Exception while reading current revision id.", e); + throw new IOException("Exception while reading current revision id.",e); + } + + return curId; + } + + + private long incrementAndReadCounter() throws IOException{ + + long curId, usedId; + try { + Stat stat = new Stat(); + byte[] data = zookeeper.getData(this.zNodeDataLoc, false, stat); + usedId = Long.parseLong((new String(data,Charset.forName("UTF-8")))); + curId = usedId +1; + String lastUsedID = String.valueOf(curId); + zookeeper.setData(this.zNodeDataLoc, lastUsedID.getBytes(Charset.forName("UTF-8")), -1 ); + + } catch (KeeperException e) { + LOG.warn("Exception while incrementing revision id.", e); + throw new IOException("Exception while incrementing revision id. ", e); + } catch (InterruptedException e) { + LOG.warn("Exception while incrementing revision id.", e); + throw new IOException("Exception while incrementing revision id. ", e); + } + + return curId; + } + + /* + * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockAcquired() + */ + @Override + public void lockAcquired() { + + + } + + /* + * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockReleased() + */ + @Override + public void lockReleased() { + + } + + +} Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/PathUtil.java =================================================================== --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/PathUtil.java (revision 0) +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/PathUtil.java (revision 0) @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase.snapshot; + + +/** + * The PathUtil class is a utility class to provide information about various + * znode paths. The following is the znode structure used for storing information. + * baseDir/ClockNode + * baseDir/TrasactionBasePath + * baseDir/TrasactionBasePath/TableA/revisionID + * baseDir/TrasactionBasePath/TableA/columnFamily-1 + * baseDir/TrasactionBasePath/TableA/columnFamily-1/runnningTxns + * baseDir/TrasactionBasePath/TableA/columnFamily-1/abortedTxns + * baseDir/TrasactionBasePath/TableB/revisionID + * baseDir/TrasactionBasePath/TableB/columnFamily-1 + * baseDir/TrasactionBasePath/TableB/columnFamily-1/runnningTxns + * baseDir/TrasactionBasePath/TableB/columnFamily-1/abortedTxns + + */ +public class PathUtil{ + + static final String DATA_DIR = "/data"; + static final String CLOCK_NODE = "/clock"; + + /** + * This method returns the data path associated with the currently + * running transactions of a given table and column/column family. + * @param baseDir + * @param tableName + * @param columnFamily + * @return The path of the running transactions data. + */ + static String getRunningTxnInfoPath(String baseDir, String tableName, + String columnFamily) { + String txnBasePath = getTransactionBasePath(baseDir); + String path = txnBasePath + "/" + tableName + "/" + columnFamily + + "/runningTxns"; + return path; + } + + /** + * This method returns the data path associated with the aborted + * transactions of a given table and column/column family. + * @param baseDir The base directory for revision management. + * @param tableName The name of the table. + * @param columnFamily + * @return The path of the aborted transactions data. + */ + static String getAbortInformationPath(String baseDir, String tableName, + String columnFamily) { + String txnBasePath = getTransactionBasePath(baseDir); + String path = txnBasePath + "/" + tableName + "/" + columnFamily + + "/abortData"; + return path; + } + + /** + * Gets the revision id node for a given table. + * + * @param baseDir the base dir for revision management. + * @param tableName the table name + * @return the revision id node path. + */ + static String getRevisionIDNode(String baseDir, String tableName) { + String rmBasePath = getTransactionBasePath(baseDir); + String revisionIDNode = rmBasePath + "/" + tableName + "/idgen"; + return revisionIDNode; + } + + /** + * Gets the lock management node for any znode that needs to be locked. + * + * @param path the path of the znode. + * @return the lock management node path. + */ + static String getLockManagementNode(String path) { + String lockNode = path + "_locknode_"; + return lockNode; + } + + /** + * This method returns the base path for the transaction data. + * + * @param baseDir The base dir for revision management. + * @return The base path for the transaction data. + */ + static String getTransactionBasePath(String baseDir) { + String txnBaseNode = baseDir + DATA_DIR; + return txnBaseNode; + } + + /** + * Gets the txn data path for a given table. + * + * @param baseDir the base dir for revision management. + * @param tableName the table name + * @return the txn data path for the table. + */ + static String getTxnDataPath(String baseDir, String tableName){ + String txnBasePath = getTransactionBasePath(baseDir); + String path = txnBasePath + "/" + tableName; + return path; + } + + /** + * This method returns the data path for clock node. + * + * @param baseDir + * @return The data path for clock. + */ + static String getClockPath(String baseDir) { + String clockNode = baseDir + CLOCK_NODE; + return clockNode; + } +} Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java =================================================================== --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java (revision 0) +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java (revision 0) @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase.snapshot; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +/** + * This interface provides APIs for implementing revision management. + */ +public interface RevisionManager { + + public static final String REVISION_MGR_IMPL_CLASS = "revision.manager.impl.class"; + + /** + * Initialize the revision manager. + */ + public void initialize(Properties properties); + + /** + * Opens the revision manager. + * + * @throws IOException + */ + public void open() throws IOException; + + /** + * Closes the revision manager. + * + * @throws IOException + */ + public void close() throws IOException; + + /** + * Start the write transaction. + * + * @param table + * @param families + * @return + * @throws IOException + */ + public Transaction beginWriteTransaction(String table, List families) + throws IOException; + + /** + * Start the write transaction. + * + * @param table + * @param families + * @param keepAlive + * @return + * @throws IOException + */ + public Transaction beginWriteTransaction(String table, + List families, long keepAlive) throws IOException; + + /** + * Commit the write transaction. + * + * @param transaction + * @throws IOException + */ + public void commitWriteTransaction(Transaction transaction) + throws IOException; + + /** + * Abort the write transaction. + * + * @param transaction + * @throws IOException + */ + public void abortWriteTransaction(Transaction transaction) + throws IOException; + + /** + * Create the latest snapshot of the table. + * + * @param tableName + * @return + * @throws IOException + */ + public TableSnapshot createSnapshot(String tableName) throws IOException; + + /** + * Create the snapshot of the table using the revision number. + * + * @param tableName + * @param revision + * @return + * @throws IOException + */ + public TableSnapshot createSnapshot(String tableName, long revision) + throws IOException; + + /** + * Extends the expiration of a transaction by the time indicated by keep alive. + * + * @param transaction + * @throws IOException + */ + public void keepAlive(Transaction transaction) throws IOException; + +} Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java =================================================================== --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java (revision 0) +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java (revision 0) @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase.snapshot; + +import java.io.IOException; +import java.util.Properties; + +public class RevisionManagerFactory { + + /** + * Gets an instance of revision manager. + * + * @param properties The properties required to created the revision manager. + * @return the revision manager An instance of revision manager. + * @throws IOException Signals that an I/O exception has occurred. + */ + public static RevisionManager getRevisionManager(Properties properties) throws IOException{ + + RevisionManager revisionMgr; + ClassLoader classLoader = Thread.currentThread() + .getContextClassLoader(); + if (classLoader == null) { + classLoader = RevisionManagerFactory.class.getClassLoader(); + } + String className = properties.getProperty( + RevisionManager.REVISION_MGR_IMPL_CLASS, + ZKBasedRevisionManager.class.getName()); + try { + + @SuppressWarnings("unchecked") + Class revisionMgrClass = (Class) Class + .forName(className, true , classLoader); + revisionMgr = (RevisionManager) revisionMgrClass.newInstance(); + revisionMgr.initialize(properties); + } catch (ClassNotFoundException e) { + throw new IOException( + "The implementation class of revision manager not found.", + e); + } catch (InstantiationException e) { + throw new IOException( + "Exception encountered during instantiating revision manager implementation.", + e); + } catch (IllegalAccessException e) { + throw new IOException( + "IllegalAccessException encountered during instantiating revision manager implementation.", + e); + } catch (IllegalArgumentException e) { + throw new IOException( + "IllegalArgumentException encountered during instantiating revision manager implementation.", + e); + } + return revisionMgr; + } + +} Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java =================================================================== --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java (revision 0) +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java (revision 0) @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase.snapshot; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * The snapshot for a table and a list of column families. + */ +public class TableSnapshot { + + private String name; + + private Map cfRevisionMap; + + + public TableSnapshot(String name, Map cfRevMap) { + this.name = name; + this.cfRevisionMap = cfRevMap; + } + + /** + * Gets the table name. + * + * @return String The name of the table. + */ + public String getTableName() { + return name; + } + + /** + * Gets the column families. + * + * @return List A list of column families associated with the snapshot. + */ + public List getColumnFamilies(){ + return new ArrayList(this.cfRevisionMap.keySet()); + } + + /** + * Gets the revision. + * + * @param familyName The name of the column family. + * @return the revision + */ + public long getRevision(String familyName){ + return this.cfRevisionMap.get(familyName); + } + + @Override + public String toString() { + String snapshot = "Table Name : " + name + + " Column Familiy revision : " + cfRevisionMap.toString(); + return snapshot; + } +} Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java =================================================================== --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java (revision 0) +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java (revision 0) @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase.snapshot; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** + * This class is responsible for storing information related to + * transactions. + */ +public class Transaction implements Serializable { + + private String tableName; + private List columnFamilies = new ArrayList(); + private long timeStamp; + private long keepAlive; + private long revision; + + + Transaction(String tableName, List columnFamilies, long revision, long timestamp) { + this.tableName = tableName; + this.columnFamilies = columnFamilies; + this.timeStamp = timestamp; + this.revision = revision; + } + + /** + * @return The revision number associated with a transaction. + */ + public long getRevisionNumber(){ + return this.revision; + } + + /** + * @return The table name associated with a transaction. + */ + public String getTableName() { + return tableName; + } + + /** + * @return The column families associated with a transaction. + */ + public List getColumnFamilies() { + return columnFamilies; + } + + /** + * @return The expire timestamp associated with a transaction. + */ + long getTransactionExpireTimeStamp(){ + return this.timeStamp + this.keepAlive; + } + + void setKeepAlive(long seconds){ + this.keepAlive = seconds; + } + + /** + * Gets the keep alive value. + * + * @return long The keep alive value for the transaction. + */ + public long getKeepAliveValue(){ + return this.keepAlive; + } + + /** + * Gets the family revision info. + * + * @return FamilyRevision An instance of FamilyRevision associated with the transaction. + */ + FamilyRevision getFamilyRevisionInfo(){ + return new FamilyRevision(revision, getTransactionExpireTimeStamp()); + } + + /** + * Keep alive transaction. This methods extends the expire timestamp of a + * transaction by the "keep alive" amount. + */ + void keepAliveTransaction(){ + this.timeStamp = this.timeStamp + this.keepAlive; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Revision : "); + sb.append(this.getRevisionNumber()); + sb.append(" Timestamp : "); + sb.append(this.getTransactionExpireTimeStamp()); + sb.append("\n").append("Table : "); + sb.append(this.tableName).append("\n"); + sb.append("Column Families : "); + sb.append(this.columnFamilies.toString()); + return sb.toString(); + } +} Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java =================================================================== --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java (revision 0) +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java (revision 0) @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase.snapshot; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hcatalog.hbase.snapshot.lock.LockListener; +import org.apache.hcatalog.hbase.snapshot.lock.WriteLock; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs.Ids; + +/** + * The service for providing revision management to Hbase tables. + */ +public class ZKBasedRevisionManager implements RevisionManager{ + + public static final String HOSTLIST = "revision.manager.zk.HostList"; + public static final String DATADIR = "revision.manager.zk.DataDir"; + private static int DEFAULT_WRITE_TRANSACTION_TIMEOUT = 14400000; + private static final Log LOG = LogFactory.getLog(ZKBasedRevisionManager.class); + private String zkHostList; + private String baseDir; + private ZKUtil zkUtil; + + + /* + * @see org.apache.hcatalog.hbase.snapshot.RevisionManager#initialize() + */ + @Override + public void initialize(Properties properties) { + this.zkHostList = properties.getProperty(ZKBasedRevisionManager.HOSTLIST, "localhost:2181"); + this.baseDir = properties.getProperty(ZKBasedRevisionManager.DATADIR,"/revision-management"); + } + + /** + * Open a ZooKeeper connection + * @throws java.io.IOException + */ + + public void open() throws IOException { + zkUtil = new ZKUtil(zkHostList, this.baseDir); + zkUtil.createRootZNodes(); + LOG.info("Created root znodes for revision manager."); + } + + /** + * Close Zookeeper connection + */ + public void close() { + zkUtil.closeZKConnection(); + } + + private void checkInputParams(String table, List families) { + if (table == null) { + throw new IllegalArgumentException( + "The table name must be specified for reading."); + } + if (families == null || families.isEmpty()) { + throw new IllegalArgumentException( + "At least one column family should be specified for reading."); + } + } + + + /* @param table + /* @param families + /* @param keepAlive + /* @return + /* @throws IOException + * @see org.apache.hcatalog.hbase.snapshot.RevisionManager#beginWriteTransaction(java.lang.String, java.util.List, long) + */ + public Transaction beginWriteTransaction(String table, + List families, long keepAlive) throws IOException { + + checkInputParams(table, families); + zkUtil.setUpZnodesForTable(table, families); + long nextId = zkUtil.nextId(table); + long expireTimestamp = zkUtil.getTimeStamp(); + Transaction transaction = new Transaction(table, families, nextId, + expireTimestamp); + if (keepAlive != -1) { + transaction.setKeepAlive(keepAlive); + } else { + transaction.setKeepAlive(DEFAULT_WRITE_TRANSACTION_TIMEOUT); + } + + refreshTransactionList(transaction.getTableName()); + String lockPath = prepareLockNode(table); + WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath, + Ids.OPEN_ACL_UNSAFE); + RMLockListener myLockListener = new RMLockListener(); + wLock.setLockListener(myLockListener); + try { + boolean lockGrabbed = wLock.lock(); + if (lockGrabbed == false) { + //TO DO : Let this request queue up and try obtaining lock. + throw new IOException( + "Unable to obtain lock while beginning transaction. " + + transaction.toString()); + } else { + List colFamilies = transaction.getColumnFamilies(); + FamilyRevision revisionData = transaction.getFamilyRevisionInfo(); + for (String cfamily : colFamilies) { + String path = PathUtil.getRunningTxnInfoPath( + baseDir, table, cfamily); + zkUtil.updateData(path, revisionData, + ZKUtil.UpdateMode.APPEND); + } + } + } catch (KeeperException e) { + throw new IOException("Exception while obtaining lock.", e); + } catch (InterruptedException e) { + throw new IOException("Exception while obtaining lock.", e); + } + finally { + wLock.unlock(); + } + + return transaction; + } + + /* @param table The table name. + /* @param families The column families involved in the transaction. + /* @return transaction The transaction which was started. + /* @throws IOException + * @see org.apache.hcatalog.hbase.snapshot.RevisionManager#beginWriteTransaction(java.lang.String, java.util.List) + */ + public Transaction beginWriteTransaction(String table, List families) + throws IOException { + return beginWriteTransaction(table, families, -1); + } + + /** + * This method commits a write transaction. + * @param transaction The revision information associated with transaction. + * @throws java.io.IOException + */ + public void commitWriteTransaction(Transaction transaction) throws IOException { + refreshTransactionList(transaction.getTableName()); + + String lockPath = prepareLockNode(transaction.getTableName()); + WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath, + Ids.OPEN_ACL_UNSAFE); + RMLockListener myLockListener = new RMLockListener(); + wLock.setLockListener(myLockListener); + try { + boolean lockGrabbed = wLock.lock(); + if (lockGrabbed == false) { + //TO DO : Let this request queue up and try obtaining lock. + throw new IOException( + "Unable to obtain lock while commiting transaction. " + + transaction.toString()); + } else { + String tableName = transaction.getTableName(); + List colFamilies = transaction.getColumnFamilies(); + FamilyRevision revisionData = transaction.getFamilyRevisionInfo(); + for (String cfamily : colFamilies) { + String path = PathUtil.getRunningTxnInfoPath( + baseDir, tableName, cfamily); + zkUtil.updateData(path, revisionData, + ZKUtil.UpdateMode.REMOVE); + } + + } + } catch (KeeperException e) { + throw new IOException("Exception while obtaining lock.", e); + } catch (InterruptedException e) { + throw new IOException("Exception while obtaining lock.", e); + } + finally { + wLock.unlock(); + } + LOG.info("Write Transaction committed: " + transaction.toString()); + } + + /** + * This method aborts a write transaction. + * @param state the state associated with the Transaction + * @throws java.io.IOException + */ + public void abortWriteTransaction(Transaction transaction) throws IOException { + + refreshTransactionList(transaction.getTableName()); + String lockPath = prepareLockNode(transaction.getTableName()); + WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath, + Ids.OPEN_ACL_UNSAFE); + RMLockListener myLockListener = new RMLockListener(); + wLock.setLockListener(myLockListener); + try { + boolean lockGrabbed = wLock.lock(); + if (lockGrabbed == false) { + //TO DO : Let this request queue up and try obtaining lock. + throw new IOException( + "Unable to obtain lock while aborting transaction. " + + transaction.toString()); + } else { + String tableName = transaction.getTableName(); + List colFamilies = transaction.getColumnFamilies(); + FamilyRevision revisionData = transaction + .getFamilyRevisionInfo(); + for (String cfamily : colFamilies) { + String path = PathUtil.getRunningTxnInfoPath( + baseDir, tableName, cfamily); + zkUtil.updateData(path, revisionData, + ZKUtil.UpdateMode.REMOVE); + path = PathUtil.getAbortInformationPath(baseDir, + tableName, cfamily); + zkUtil.updateData(path, revisionData, + ZKUtil.UpdateMode.APPEND); + } + + } + } catch (KeeperException e) { + throw new IOException("Exception while obtaining lock.", e); + } catch (InterruptedException e) { + throw new IOException("Exception while obtaining lock.", e); + } + finally { + wLock.unlock(); + } + LOG.info("Write Transaction aborted: " + transaction.toString()); + } + + + /* @param transaction + /* @throws IOException + * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#keepAlive(org.apache.hcatalog.hbase.snapshot.Transaction) + */ + public void keepAlive(Transaction transaction) + throws IOException { + + refreshTransactionList(transaction.getTableName()); + transaction.keepAliveTransaction(); + String lockPath = prepareLockNode(transaction.getTableName()); + WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath, + Ids.OPEN_ACL_UNSAFE); + RMLockListener myLockListener = new RMLockListener(); + wLock.setLockListener(myLockListener); + try { + boolean lockGrabbed = wLock.lock(); + if (lockGrabbed == false) { + //TO DO : Let this request queue up and try obtaining lock. + throw new IOException( + "Unable to obtain lock for keep alive of transaction. " + + transaction.toString()); + }else { + String tableName = transaction.getTableName(); + List colFamilies = transaction.getColumnFamilies(); + FamilyRevision revisionData = transaction.getFamilyRevisionInfo(); + for (String cfamily : colFamilies) { + String path = PathUtil.getRunningTxnInfoPath( + baseDir, tableName, cfamily); + zkUtil.updateData(path, revisionData, + ZKUtil.UpdateMode.KEEP_ALIVE); + } + + } + } catch (KeeperException e) { + throw new IOException("Exception while obtaining lock.", e); + } catch (InterruptedException e) { + throw new IOException("Exception while obtaining lock.", e); + }finally { + wLock.unlock(); + } + + } + + /* This method allows the user to create latest snapshot of a + /* table. + /* @param tableName The table whose snapshot is being created. + /* @return TableSnapshot An instance of TableSnaphot + /* @throws IOException + * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#createSnapshot(java.lang.String) + */ + public TableSnapshot createSnapshot(String tableName) throws IOException{ + refreshTransactionList(tableName); + long latestID = zkUtil.currentID(tableName); + HashMap cfMap = new HashMap(); + List columnFamilyNames = zkUtil.getColumnFamiliesOfTable(tableName); + + for(String cfName: columnFamilyNames){ + String cfPath = PathUtil.getRunningTxnInfoPath(baseDir, tableName, cfName); + List tranxList = zkUtil.getTransactionList(cfPath); + long version; + if (!tranxList.isEmpty()) { + Collections.sort(tranxList); + // get the smallest running Transaction ID + long runningVersion = tranxList.get(0).getRevision(); + version = runningVersion -1; + } else { + version = latestID; + } + cfMap.put(cfName, version); + } + + return new TableSnapshot(tableName, cfMap); + } + + /* This method allows the user to create snapshot of a + /* table with a given revision number. + /* @param tableName + /* @param revision + /* @return TableSnapshot + /* @throws IOException + * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#createSnapshot(java.lang.String, long) + */ + public TableSnapshot createSnapshot(String tableName, long revision) throws IOException{ + + long currentID = zkUtil.currentID(tableName); + if (revision > currentID) { + throw new IOException( + "The revision specified in the snapshot is higher than the current revision of the table."); + } + refreshTransactionList(tableName); + HashMap cfMap = new HashMap(); + List columnFamilies = zkUtil.getColumnFamiliesOfTable(tableName); + + for(String cf: columnFamilies){ + cfMap.put(cf, revision); + } + + return new TableSnapshot(tableName, cfMap); + } + + /** + * Get the list of in-progress Transactions for a column family + * @param table the table name + * @param columnFamily the column family name + * @return a list of in-progress WriteTransactions + * @throws java.io.IOException + */ + List getRunningTransactions(String table, + String columnFamily) throws IOException { + String path = PathUtil.getRunningTxnInfoPath(baseDir, table, + columnFamily); + return zkUtil.getTransactionList(path); + } + + /** + * Get the list of aborted Transactions for a column family + * @param table the table name + * @param columnFamily the column family name + * @return a list of aborted WriteTransactions + * @throws java.io.IOException + */ + List getAbortedWriteTransactions(String table, + String columnFamily) throws IOException { + String path = PathUtil.getAbortInformationPath(baseDir, table, columnFamily); + return zkUtil.getTransactionList(path); + } + + private void refreshTransactionList(String tableName) throws IOException{ + String lockPath = prepareLockNode(tableName); + WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath, + Ids.OPEN_ACL_UNSAFE); + RMLockListener myLockListener = new RMLockListener(); + wLock.setLockListener(myLockListener); + try { + boolean lockGrabbed = wLock.lock(); + if (lockGrabbed == false) { + //TO DO : Let this request queue up and try obtaining lock. + throw new IOException( + "Unable to obtain lock while refreshing transactions of table " + + tableName + "."); + }else { + List cfPaths = zkUtil + .getColumnFamiliesOfTable(tableName); + for (String cf : cfPaths) { + String runningDataPath = PathUtil.getRunningTxnInfoPath( + baseDir, tableName, cf); + zkUtil.refreshTransactions(runningDataPath); + } + + } + } catch (KeeperException e) { + throw new IOException("Exception while obtaining lock.", e); + } catch (InterruptedException e) { + throw new IOException("Exception while obtaining lock.", e); + } finally { + wLock.unlock(); + } + + } + + private String prepareLockNode(String tableName) throws IOException{ + String txnDataPath = PathUtil.getTxnDataPath(this.baseDir, tableName); + String lockPath = PathUtil.getLockManagementNode(txnDataPath); + zkUtil.ensurePathExists(lockPath, null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + return lockPath; + } + + + /* + * This class is a listener class for the locks used in revision management. + * TBD: Use the following class to signal that that the lock is actually + * been granted. + */ + class RMLockListener implements LockListener { + + /* + * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockAcquired() + */ + @Override + public void lockAcquired() { + + } + + /* + * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockReleased() + */ + @Override + public void lockReleased() { + + } + + } + + +} Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java =================================================================== --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java (revision 0) +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java (revision 0) @@ -0,0 +1,485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase.snapshot; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hcatalog.hbase.snapshot.transaction.thrift.*; +import org.apache.thrift.TBase; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooKeeper.States; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +class ZKUtil { + + private int DEFAULT_SESSION_TIMEOUT = 1000000; + private ZooKeeper zkSession; + private String baseDir; + private String connectString; + private static final Log LOG = LogFactory.getLog(ZKUtil.class); + + static enum UpdateMode { + APPEND, REMOVE, KEEP_ALIVE + }; + + ZKUtil(String connection, String baseDir) { + this.connectString = connection; + this.baseDir = baseDir; + } + + /** + * This method creates znodes related to table. + * + * @param table The name of the table. + * @param families The list of column families of the table. + * @throws IOException + */ + void setUpZnodesForTable(String table, List families) + throws IOException { + + String transactionDataTablePath = PathUtil.getTxnDataPath(baseDir, table); + ensurePathExists(transactionDataTablePath, null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + for (String cf : families) { + String runningDataPath = PathUtil.getRunningTxnInfoPath( + this.baseDir, table, cf); + ensurePathExists(runningDataPath, null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + String abortDataPath = PathUtil.getAbortInformationPath( + this.baseDir, table, cf); + ensurePathExists(abortDataPath, null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } + + } + + /** + * This method ensures that a given path exists in zookeeper. If the path + * does not exists, it creates one. + * + * @param path The path of znode that is required to exist. + * @param data The data to be associated with the znode. + * @param acl The ACLs required. + * @param flags The CreateMode for the znode. + * @throws IOException + */ + void ensurePathExists(String path, byte[] data, List acl, + CreateMode flags) throws IOException { + String[] dirs = path.split("/"); + String parentPath = ""; + for (String subDir : dirs) { + if (subDir.equals("") == false) { + parentPath = parentPath + "/" + subDir; + try { + Stat stat = getSession().exists(parentPath, false); + if (stat == null) { + getSession().create(parentPath, data, acl, flags); + } + } catch (Exception e) { + throw new IOException("Exception while creating path " + + parentPath, e); + } + } + } + + } + + /** + * This method returns a list of columns of a table which were used in any + * of the transactions. + * + * @param tableName The name of table. + * @return List The list of column families in table. + * @throws IOException + */ + List getColumnFamiliesOfTable(String tableName) throws IOException { + String path = PathUtil.getTxnDataPath(baseDir, tableName); + List children = null; + List columnFamlies = new ArrayList(); + try { + children = getSession().getChildren(path, false); + } catch (KeeperException e) { + LOG.warn("Caught: ", e); + throw new IOException("Exception while obtaining columns of table.",e); + } catch (InterruptedException e) { + LOG.warn("Caught: ", e); + throw new IOException("Exception while obtaining columns of table.",e); + } + + for (String child : children) { + if ((child.contains("idgen") == false) + && (child.contains("_locknode_") == false)) { + columnFamlies.add(child); + } + } + return columnFamlies; + } + + /** + * This method returns a time stamp for use by the transactions. + * + * @return long The current timestamp in zookeeper. + * @throws IOException + */ + long getTimeStamp() throws IOException { + long timeStamp; + Stat stat; + String clockPath = PathUtil.getClockPath(this.baseDir); + ensurePathExists(clockPath, null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + try { + getSession().exists(clockPath, false); + stat = getSession().setData(clockPath, null, -1); + + } catch (KeeperException e) { + LOG.warn("Caught: ", e); + throw new IOException("Exception while obtaining timestamp ", e); + } catch (InterruptedException e) { + LOG.warn("Caught: ", e); + throw new IOException("Exception while obtaining timestamp ", e); + } + timeStamp = stat.getMtime(); + return timeStamp; + } + + /** + * This method returns the next revision number to be used for any + * transaction purposes. + * + * @param tableName The name of the table. + * @return revision number The revision number last used by any transaction. + * @throws IOException + */ + long nextId(String tableName) throws IOException { + String idNode = PathUtil.getRevisionIDNode(this.baseDir, tableName); + ensurePathExists(idNode, Bytes.toBytes("0"), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + String lockNode = PathUtil.getLockManagementNode(idNode); + ensurePathExists(lockNode, null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + IDGenerator idf = new IDGenerator(getSession(), tableName, idNode); + long id = idf.obtainID(); + return id; + } + + /** + * The latest used revision id of the table. + * + * @param tableName The name of the table. + * @return the long The revision number to use by any transaction. + * @throws IOException Signals that an I/O exception has occurred. + */ + long currentID(String tableName) throws IOException{ + String idNode = PathUtil.getRevisionIDNode(this.baseDir, tableName); + ensurePathExists(idNode, Bytes.toBytes("0"), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + String lockNode = PathUtil.getLockManagementNode(idNode); + ensurePathExists(lockNode, null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + IDGenerator idf = new IDGenerator(getSession(), tableName, idNode); + long id = idf.readID(); + return id; + } + + /** + * This methods retrieves the list of transaction information associated + * with each column/column family of a table. + * + * @param path The znode path + * @return List of FamilyRevision The list of transactions in the given path. + * @throws IOException + */ + List getTransactionList(String path) + throws IOException { + + byte[] data = getRawData(path, new Stat()); + ArrayList wtxnList = new ArrayList(); + if (data == null) { + return wtxnList; + } + StoreFamilyRevisionList txnList = new StoreFamilyRevisionList(); + deserialize(txnList, data); + Iterator itr = txnList.getRevisionListIterator(); + + while (itr.hasNext()) { + StoreFamilyRevision wtxn = itr.next(); + wtxnList.add(new FamilyRevision(wtxn.getRevision(), wtxn + .getTimestamp())); + } + + return wtxnList; + } + + /** + * This method returns the data associated with the path in zookeeper. + * + * @param path The znode path + * @param stat Zookeeper stat + * @return byte array The data stored in the znode. + * @throws IOException + */ + byte[] getRawData(String path, Stat stat) throws IOException { + byte[] data = null; + try { + data = getSession().getData(path, false, stat); + } catch (Exception e) { + throw new IOException( + "Exception while obtaining raw data from zookeeper path " + + path, e); + } + return data; + } + + /** + * This method created the basic znodes in zookeeper for revision + * management. + * + * @throws IOException + */ + void createRootZNodes() throws IOException { + String txnBaseNode = PathUtil.getTransactionBasePath(this.baseDir); + String clockNode = PathUtil.getClockPath(this.baseDir); + ensurePathExists(txnBaseNode, null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + ensurePathExists(clockNode, null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } + + /** + * This method closes the zookeeper session. + */ + void closeZKConnection() { + if (zkSession != null) { + try { + zkSession.close(); + } catch (InterruptedException e) { + LOG.warn("Close failed: ", e); + } + zkSession = null; + LOG.info("Disconnected to ZooKeeper"); + } + } + + /** + * This method returns a zookeeper session. If the current session is closed, + * then a new session is created. + * + * @return ZooKeeper An instance of zookeeper client. + * @throws IOException + */ + ZooKeeper getSession() throws IOException { + if (zkSession == null || zkSession.getState() == States.CLOSED) { + synchronized (this) { + if (zkSession == null || zkSession.getState() == States.CLOSED) { + zkSession = new ZooKeeper(this.connectString, + this.DEFAULT_SESSION_TIMEOUT, new ZKWatcher()); + } + } + } + return zkSession; + } + + /** + * This method updates the transaction data related to a znode. + * + * @param String The path to the transaction data. + * @param FamilyRevision The FamilyRevision to be updated. + * @param UpdateMode The mode to update like append, update, remove. + * @throws IOException + */ + void updateData(String path, FamilyRevision updateTx, UpdateMode mode) + throws IOException { + + if (updateTx == null) { + throw new IOException( + "The transaction to be updated found to be null."); + } + List currentData = getTransactionList(path); + List newData = new ArrayList(); + boolean dataFound = false; + long updateVersion = updateTx.getRevision(); + for (FamilyRevision tranx : currentData) { + if (tranx.getRevision() != updateVersion) { + newData.add(tranx); + } else { + dataFound = true; + } + } + switch (mode) { + case REMOVE: + if (dataFound == false) { + throw new IOException( + "The transaction to be removed not found in the data."); + } + LOG.info("Removed trasaction : " + updateTx.toString()); + break; + case KEEP_ALIVE: + if (dataFound == false) { + throw new IOException( + "The transaction to be kept alove not found in the data. It might have been expired."); + } + newData.add(updateTx); + LOG.info("keep alive of transaction : " + updateTx.toString()); + break; + case APPEND: + if (dataFound == true) { + throw new IOException( + "The data to be appended already exists."); + } + newData.add(updateTx); + LOG.info("Added transaction : " + updateTx.toString()); + break; + } + + // For serialization purposes. + List newTxnList = new ArrayList(); + for (FamilyRevision wtxn : newData) { + StoreFamilyRevision newTxn = new StoreFamilyRevision(wtxn.getRevision(), + wtxn.getExpireTimestamp()); + newTxnList.add(newTxn); + } + StoreFamilyRevisionList wtxnList = new StoreFamilyRevisionList(newTxnList); + byte[] newByteData = serialize(wtxnList); + + Stat stat = null; + try { + stat = zkSession.setData(path, newByteData, -1); + } catch (KeeperException e) { + throw new IOException( + "Exception while updating trasactional data. ", e); + } catch (InterruptedException e) { + throw new IOException( + "Exception while updating trasactional data. ", e); + } + + if (stat != null) { + LOG.info("Transaction list stored at " + path + "."); + } + + } + + /** + * Refresh transactions on a given transaction data path. + * + * @param path The path to the transaction data. + * @throws IOException Signals that an I/O exception has occurred. + */ + void refreshTransactions(String path) throws IOException{ + List currentData = getTransactionList(path); + List newData = new ArrayList(); + + for (FamilyRevision tranx : currentData) { + if (tranx.getExpireTimestamp() > getTimeStamp()) { + newData.add(tranx); + } + } + + if(newData.equals(currentData) == false){ + List newTxnList = new ArrayList(); + for (FamilyRevision wtxn : newData) { + StoreFamilyRevision newTxn = new StoreFamilyRevision(wtxn.getRevision(), + wtxn.getExpireTimestamp()); + newTxnList.add(newTxn); + } + StoreFamilyRevisionList wtxnList = new StoreFamilyRevisionList(newTxnList); + byte[] newByteData = serialize(wtxnList); + + try { + zkSession.setData(path, newByteData, -1); + } catch (KeeperException e) { + throw new IOException( + "Exception while updating trasactional data. ", e); + } catch (InterruptedException e) { + throw new IOException( + "Exception while updating trasactional data. ", e); + } + + } + + } + + /** + * This method serializes a given instance of TBase object. + * + * @param obj An instance of TBase + * @return byte array The serialized data. + * @throws IOException + */ + static byte[] serialize(TBase obj) throws IOException { + if (obj == null) + return new byte[0]; + try { + TSerializer serializer = new TSerializer( + new TBinaryProtocol.Factory()); + byte[] bytes = serializer.serialize(obj); + return bytes; + } catch (Exception e) { + throw new IOException("Serialization error: ", e); + } + } + + + /** + * This method deserializes the given byte array into the TBase object. + * + * @param obj An instance of TBase + * @param data Output of deserialization. + * @throws IOException + */ + static void deserialize(TBase obj, byte[] data) throws IOException { + if (data == null || data.length == 0) + return; + try { + TDeserializer deserializer = new TDeserializer( + new TBinaryProtocol.Factory()); + deserializer.deserialize(obj, data); + } catch (Exception e) { + throw new IOException("Deserialization error: " + e.getMessage(), e); + } + } + + private class ZKWatcher implements Watcher { + public void process(WatchedEvent event) { + switch (event.getState()) { + case Expired: + LOG.info("The client session has expired. Try opening a new " + + "session and connecting again."); + zkSession = null; + break; + default: + + } + } + } + +} Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/LockListener.java =================================================================== --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/LockListener.java (revision 0) +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/LockListener.java (revision 0) @@ -0,0 +1,39 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase.snapshot.lock; + +/** + * This class has two methods which are call + * back methods when a lock is acquired and + * when the lock is released. + * This class has been used as-is from the zookeeper 3.3.4 recipes minor changes + * in the package name. + */ +public interface LockListener { + /** + * call back called when the lock + * is acquired + */ + public void lockAcquired(); + + /** + * call back called when the lock is + * released. + */ + public void lockReleased(); +} \ No newline at end of file Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ProtocolSupport.java =================================================================== --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ProtocolSupport.java (revision 0) +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ProtocolSupport.java (revision 0) @@ -0,0 +1,193 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase.snapshot.lock; + +import org.apache.hcatalog.hbase.snapshot.lock.ZooKeeperOperation; +import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A base class for protocol implementations which provides a number of higher + * level helper methods for working with ZooKeeper along with retrying synchronous + * operations if the connection to ZooKeeper closes such as + * {@link #retryOperation(ZooKeeperOperation)} + * This class has been used as-is from the zookeeper 3.4.0 recipes with + * changes in the retry delay, retry count values and package name. + */ +class ProtocolSupport { + private static final Logger LOG = Logger.getLogger(ProtocolSupport.class); + + protected final ZooKeeper zookeeper; + private AtomicBoolean closed = new AtomicBoolean(false); + private long retryDelay = 500L; + private int retryCount = 3; + private List acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; + + public ProtocolSupport(ZooKeeper zookeeper) { + this.zookeeper = zookeeper; + } + + /** + * Closes this strategy and releases any ZooKeeper resources; but keeps the + * ZooKeeper instance open + */ + public void close() { + if (closed.compareAndSet(false, true)) { + doClose(); + } + } + + /** + * return zookeeper client instance + * @return zookeeper client instance + */ + public ZooKeeper getZookeeper() { + return zookeeper; + } + + /** + * return the acl its using + * @return the acl. + */ + public List getAcl() { + return acl; + } + + /** + * set the acl + * @param acl the acl to set to + */ + public void setAcl(List acl) { + this.acl = acl; + } + + /** + * get the retry delay in milliseconds + * @return the retry delay + */ + public long getRetryDelay() { + return retryDelay; + } + + /** + * Sets the time waited between retry delays + * @param retryDelay the retry delay + */ + public void setRetryDelay(long retryDelay) { + this.retryDelay = retryDelay; + } + + /** + * Allow derived classes to perform + * some custom closing operations to release resources + */ + protected void doClose() { + } + + + /** + * Perform the given operation, retrying if the connection fails + * @return object. it needs to be cast to the callee's expected + * return type. + */ + protected Object retryOperation(ZooKeeperOperation operation) + throws KeeperException, InterruptedException { + KeeperException exception = null; + for (int i = 0; i < retryCount; i++) { + try { + return operation.execute(); + } catch (KeeperException.SessionExpiredException e) { + LOG.warn("Session expired for: " + zookeeper + " so reconnecting due to: " + e, e); + throw e; + } catch (KeeperException.ConnectionLossException e) { + if (exception == null) { + exception = e; + } + LOG.debug("Attempt " + i + " failed with connection loss so " + + "attempting to reconnect: " + e, e); + retryDelay(i); + } + } + throw exception; + } + + /** + * Ensures that the given path exists with no data, the current + * ACL and no flags + * @param path + */ + protected void ensurePathExists(String path) { + ensureExists(path, null, acl, CreateMode.PERSISTENT); + } + + /** + * Ensures that the given path exists with the given data, ACL and flags + * @param path + * @param acl + * @param flags + */ + protected void ensureExists(final String path, final byte[] data, + final List acl, final CreateMode flags) { + try { + retryOperation(new ZooKeeperOperation() { + public boolean execute() throws KeeperException, InterruptedException { + Stat stat = zookeeper.exists(path, false); + if (stat != null) { + return true; + } + zookeeper.create(path, data, acl, flags); + return true; + } + }); + } catch (KeeperException e) { + LOG.warn("Caught: " + e, e); + } catch (InterruptedException e) { + LOG.warn("Caught: " + e, e); + } + } + + /** + * Returns true if this protocol has been closed + * @return true if this protocol is closed + */ + protected boolean isClosed() { + return closed.get(); + } + + /** + * Performs a retry delay if this is not the first attempt + * @param attemptCount the number of the attempts performed so far + */ + protected void retryDelay(int attemptCount) { + if (attemptCount > 0) { + try { + Thread.sleep(attemptCount * retryDelay); + } catch (InterruptedException e) { + LOG.debug("Failed to sleep: " + e, e); + } + } + } +} Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/WriteLock.java =================================================================== --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/WriteLock.java (revision 0) +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/WriteLock.java (revision 0) @@ -0,0 +1,297 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase.snapshot.lock; + +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * A protocol to implement an exclusive + * write lock or to elect a leader.

You invoke {@link #lock()} to + * start the process of grabbing the lock; you may get the lock then or it may be + * some time later.

You can register a listener so that you are invoked + * when you get the lock; otherwise you can ask if you have the lock + * by calling {@link #isOwner()} + * This class has been used as-is from the zookeeper 3.4.0 recipes. The only change + * made is a TODO for sorting using suffixes and the package name. + */ +public class WriteLock extends ProtocolSupport { + private static final Logger LOG = Logger.getLogger(WriteLock.class); + + private final String dir; + private String id; + private ZNodeName idName; + private String ownerId; + private String lastChildId; + private byte[] data = {0x12, 0x34}; + private LockListener callback; + private LockZooKeeperOperation zop; + + /** + * zookeeper contructor for writelock + * @param zookeeper zookeeper client instance + * @param dir the parent path you want to use for locking + * @param acls the acls that you want to use for all the paths, + * if null world read/write is used. + */ + public WriteLock(ZooKeeper zookeeper, String dir, List acl) { + super(zookeeper); + this.dir = dir; + if (acl != null) { + setAcl(acl); + } + this.zop = new LockZooKeeperOperation(); + } + + /** + * zookeeper contructor for writelock with callback + * @param zookeeper the zookeeper client instance + * @param dir the parent path you want to use for locking + * @param acl the acls that you want to use for all the paths + * @param callback the call back instance + */ + public WriteLock(ZooKeeper zookeeper, String dir, List acl, + LockListener callback) { + this(zookeeper, dir, acl); + this.callback = callback; + } + + /** + * return the current locklistener + * @return the locklistener + */ + public LockListener getLockListener() { + return this.callback; + } + + /** + * register a different call back listener + * @param callback the call back instance + */ + public void setLockListener(LockListener callback) { + this.callback = callback; + } + + /** + * Removes the lock or associated znode if + * you no longer require the lock. this also + * removes your request in the queue for locking + * in case you do not already hold the lock. + * @throws RuntimeException throws a runtime exception + * if it cannot connect to zookeeper. + */ + public synchronized void unlock() throws RuntimeException { + + if (!isClosed() && id != null) { + // we don't need to retry this operation in the case of failure + // as ZK will remove ephemeral files and we don't wanna hang + // this process when closing if we cannot reconnect to ZK + try { + + ZooKeeperOperation zopdel = new ZooKeeperOperation() { + public boolean execute() throws KeeperException, + InterruptedException { + zookeeper.delete(id, -1); + return Boolean.TRUE; + } + }; + zopdel.execute(); + } catch (InterruptedException e) { + LOG.warn("Caught: " + e, e); + //set that we have been interrupted. + Thread.currentThread().interrupt(); + } catch (KeeperException.NoNodeException e) { + // do nothing + } catch (KeeperException e) { + LOG.warn("Caught: " + e, e); + throw (RuntimeException) new RuntimeException(e.getMessage()). + initCause(e); + } + finally { + if (callback != null) { + callback.lockReleased(); + } + id = null; + } + } + } + + /** + * the watcher called on + * getting watch while watching + * my predecessor + */ + private class LockWatcher implements Watcher { + public void process(WatchedEvent event) { + // lets either become the leader or watch the new/updated node + LOG.debug("Watcher fired on path: " + event.getPath() + " state: " + + event.getState() + " type " + event.getType()); + try { + lock(); + } catch (Exception e) { + LOG.warn("Failed to acquire lock: " + e, e); + } + } + } + + /** + * a zoookeeper operation that is mainly responsible + * for all the magic required for locking. + */ + private class LockZooKeeperOperation implements ZooKeeperOperation { + + /** find if we have been created earler if not create our node + * + * @param prefix the prefix node + * @param zookeeper teh zookeeper client + * @param dir the dir paretn + * @throws KeeperException + * @throws InterruptedException + */ + private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir) + throws KeeperException, InterruptedException { + List names = zookeeper.getChildren(dir, false); + for (String name : names) { + if (name.startsWith(prefix)) { + id = name; + if (LOG.isDebugEnabled()) { + LOG.debug("Found id created last time: " + id); + } + break; + } + } + if (id == null) { + id = zookeeper.create(dir + "/" + prefix, data, + getAcl(), EPHEMERAL_SEQUENTIAL); + + if (LOG.isDebugEnabled()) { + LOG.debug("Created id: " + id); + } + } + + } + + /** + * the command that is run and retried for actually + * obtaining the lock + * @return if the command was successful or not + */ + public boolean execute() throws KeeperException, InterruptedException { + do { + if (id == null) { + long sessionId = zookeeper.getSessionId(); + String prefix = "x-" + sessionId + "-"; + // lets try look up the current ID if we failed + // in the middle of creating the znode + findPrefixInChildren(prefix, zookeeper, dir); + idName = new ZNodeName(id); + } + if (id != null) { + List names = zookeeper.getChildren(dir, false); + if (names.isEmpty()) { + LOG.warn("No children in: " + dir + " when we've just " + + "created one! Lets recreate it..."); + // lets force the recreation of the id + id = null; + } else { + // lets sort them explicitly (though they do seem to come back in order ususally :) + SortedSet sortedNames = new TreeSet(); + for (String name : names) { + //TODO: Just use the suffix to sort. + sortedNames.add(new ZNodeName(dir + "/" + name)); + } + ownerId = sortedNames.first().getName(); + SortedSet lessThanMe = sortedNames.headSet(idName); + if (!lessThanMe.isEmpty()) { + ZNodeName lastChildName = lessThanMe.last(); + lastChildId = lastChildName.getName(); + if (LOG.isDebugEnabled()) { + LOG.debug("watching less than me node: " + lastChildId); + } + Stat stat = zookeeper.exists(lastChildId, new LockWatcher()); + if (stat != null) { + return Boolean.FALSE; + } else { + LOG.warn("Could not find the" + + " stats for less than me: " + lastChildName.getName()); + } + } else { + if (isOwner()) { + if (callback != null) { + callback.lockAcquired(); + } + return Boolean.TRUE; + } + } + } + } + } + while (id == null); + return Boolean.FALSE; + } + }; + + /** + * Attempts to acquire the exclusive write lock returning whether or not it was + * acquired. Note that the exclusive lock may be acquired some time later after + * this method has been invoked due to the current lock owner going away. + */ + public synchronized boolean lock() throws KeeperException, InterruptedException { + if (isClosed()) { + return false; + } + ensurePathExists(dir); + + return (Boolean) retryOperation(zop); + } + + /** + * return the parent dir for lock + * @return the parent dir used for locks. + */ + public String getDir() { + return dir; + } + + /** + * Returns true if this node is the owner of the + * lock (or the leader) + */ + public boolean isOwner() { + return id != null && ownerId != null && id.equals(ownerId); + } + + /** + * return the id for this lock + * @return the id for this lock + */ + public String getId() { + return this.id; + } +} + Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZNodeName.java =================================================================== --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZNodeName.java (revision 0) +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZNodeName.java (revision 0) @@ -0,0 +1,110 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase.snapshot.lock; + +import org.apache.log4j.Logger; + +/** + * Represents an ephemeral znode name which has an ordered sequence number + * and can be sorted in order + * This class has been used as-is from the zookeeper 3.4.0 recipes with a + * change in package name. + */ +public class ZNodeName implements Comparable { + private final String name; + private String prefix; + private int sequence = -1; + private static final Logger LOG = Logger.getLogger(ZNodeName.class); + + public ZNodeName(String name) { + if (name == null) { + throw new NullPointerException("id cannot be null"); + } + this.name = name; + this.prefix = name; + int idx = name.lastIndexOf('-'); + if (idx >= 0) { + this.prefix = name.substring(0, idx); + try { + this.sequence = Integer.parseInt(name.substring(idx + 1)); + // If an exception occurred we misdetected a sequence suffix, + // so return -1. + } catch (NumberFormatException e) { + LOG.info("Number format exception for " + idx, e); + } catch (ArrayIndexOutOfBoundsException e) { + LOG.info("Array out of bounds for " + idx, e); + } + } + } + + @Override + public String toString() { + return name.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ZNodeName sequence = (ZNodeName) o; + + if (!name.equals(sequence.name)) return false; + + return true; + } + + @Override + public int hashCode() { + return name.hashCode() + 37; + } + + public int compareTo(ZNodeName that) { + int answer = this.prefix.compareTo(that.prefix); + if (answer == 0) { + int s1 = this.sequence; + int s2 = that.sequence; + if (s1 == -1 && s2 == -1) { + return this.name.compareTo(that.name); + } + answer = s1 == -1 ? 1 : s2 == -1 ? -1 : s1 - s2; + } + return answer; + } + + /** + * Returns the name of the znode + */ + public String getName() { + return name; + } + + /** + * Returns the sequence number + */ + public int getZNodeName() { + return sequence; + } + + /** + * Returns the text prefix before the sequence number + */ + public String getPrefix() { + return prefix; + } +} Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZooKeeperOperation.java =================================================================== --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZooKeeperOperation.java (revision 0) +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZooKeeperOperation.java (revision 0) @@ -0,0 +1,39 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase.snapshot.lock; + +import org.apache.zookeeper.KeeperException; + +/** + * A callback object which can be used for implementing retry-able operations in the + * {@link org.apache.hcatalog.hbase.snapshot.lock.ProtocolSupport} class + * This class has been used as-is from the zookeeper 3.4.0 with change in the + * package name . + */ +public interface ZooKeeperOperation { + + /** + * Performs the operation - which may be involved multiple times if the connection + * to ZooKeeper closes during this operation + * + * @return the result of the operation or null + * @throws KeeperException + * @throws InterruptedException + */ + public boolean execute() throws KeeperException, InterruptedException; +} Index: storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/WriteLockTest.java =================================================================== --- storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/WriteLockTest.java (revision 0) +++ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/WriteLockTest.java (revision 0) @@ -0,0 +1,158 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase.snapshot.lock; + +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.test.ClientBase; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +/** + * test for writelock + * This class is taken from the zookeeper 3.4.0 as-is to test the zookeeper lock + * Recipe with a change in the package name. + */ +public class WriteLockTest extends ClientBase { + protected int sessionTimeout = 10 * 1000; + protected String dir = "/" + getClass().getName(); + protected WriteLock[] nodes; + protected CountDownLatch latch = new CountDownLatch(1); + private boolean restartServer = true; + private boolean workAroundClosingLastZNodeFails = true; + private boolean killLeader = true; + + @Test + public void testRun() throws Exception { + runTest(3); + } + + class LockCallback implements LockListener { + public void lockAcquired() { + latch.countDown(); + } + + public void lockReleased() { + + } + + } + protected void runTest(int count) throws Exception { + nodes = new WriteLock[count]; + for (int i = 0; i < count; i++) { + ZooKeeper keeper = createClient(); + WriteLock leader = new WriteLock(keeper, dir, null); + leader.setLockListener(new LockCallback()); + nodes[i] = leader; + + leader.lock(); + } + + // lets wait for any previous leaders to die and one of our new + // nodes to become the new leader + latch.await(30, TimeUnit.SECONDS); + + WriteLock first = nodes[0]; + dumpNodes(count); + + // lets assert that the first election is the leader + Assert.assertTrue("The first znode should be the leader " + first.getId(), first.isOwner()); + + for (int i = 1; i < count; i++) { + WriteLock node = nodes[i]; + Assert.assertFalse("Node should not be the leader " + node.getId(), node.isOwner()); + } + + if (count > 1) { + if (killLeader) { + System.out.println("Now killing the leader"); + // now lets kill the leader + latch = new CountDownLatch(1); + first.unlock(); + latch.await(30, TimeUnit.SECONDS); + //Thread.sleep(10000); + WriteLock second = nodes[1]; + dumpNodes(count); + // lets assert that the first election is the leader + Assert.assertTrue("The second znode should be the leader " + second.getId(), second.isOwner()); + + for (int i = 2; i < count; i++) { + WriteLock node = nodes[i]; + Assert.assertFalse("Node should not be the leader " + node.getId(), node.isOwner()); + } + } + + + if (restartServer) { + // now lets stop the server + System.out.println("Now stopping the server"); + stopServer(); + Thread.sleep(10000); + + // TODO lets assert that we are no longer the leader + dumpNodes(count); + + System.out.println("Starting the server"); + startServer(); + Thread.sleep(10000); + + for (int i = 0; i < count - 1; i++) { + System.out.println("Calling acquire for node: " + i); + nodes[i].lock(); + } + dumpNodes(count); + System.out.println("Now closing down..."); + } + } + } + + protected void dumpNodes(int count) { + for (int i = 0; i < count; i++) { + WriteLock node = nodes[i]; + System.out.println("node: " + i + " id: " + + node.getId() + " is leader: " + node.isOwner()); + } + } + + @After + public void tearDown() throws Exception { + if (nodes != null) { + for (int i = 0; i < nodes.length; i++) { + WriteLock node = nodes[i]; + if (node != null) { + System.out.println("Closing node: " + i); + node.close(); + if (workAroundClosingLastZNodeFails && i == nodes.length - 1) { + System.out.println("Not closing zookeeper: " + i + " due to bug!"); + } else { + System.out.println("Closing zookeeper: " + i); + node.getZookeeper().close(); + System.out.println("Closed zookeeper: " + i); + } + } + } + } + System.out.println("Now lets stop the server"); + super.tearDown(); + + } +} Index: storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/ZNodeNameTest.java =================================================================== --- storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/ZNodeNameTest.java (revision 0) +++ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/ZNodeNameTest.java (revision 0) @@ -0,0 +1,60 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase.snapshot.lock; + +import junit.framework.TestCase; + +import java.util.SortedSet; +import java.util.TreeSet; + +import org.junit.Test; + +/** + * test for znodenames. This class is taken as-is from zookeeper lock recipe test. + * The package name has been changed. + */ +public class ZNodeNameTest extends TestCase { + @Test + public void testOrderWithSamePrefix() throws Exception { + String[] names = { "x-3", "x-5", "x-11", "x-1" }; + String[] expected = { "x-1", "x-3", "x-5", "x-11" }; + assertOrderedNodeNames(names, expected); + } + @Test + public void testOrderWithDifferentPrefixes() throws Exception { + String[] names = { "r-3", "r-2", "r-1", "w-2", "w-1" }; + String[] expected = { "r-1", "r-2", "r-3", "w-1", "w-2" }; + assertOrderedNodeNames(names, expected); + } + + protected void assertOrderedNodeNames(String[] names, String[] expected) { + int size = names.length; + assertEquals("The two arrays should be the same size!", names.length, expected.length); + SortedSet nodeNames = new TreeSet(); + for (String name : names) { + nodeNames.add(new ZNodeName(name)); + } + + int index = 0; + for (ZNodeName nodeName : nodeNames) { + String name = nodeName.getName(); + assertEquals("Node " + index, expected[index++], name); + } + } + +} Index: storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/IDGenClient.java =================================================================== --- storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/IDGenClient.java (revision 0) +++ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/IDGenClient.java (revision 0) @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase.snapshot; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +public class IDGenClient extends Thread { + + String connectionStr; + String base_dir; + ZKUtil zkutil; + Random sleepTime = new Random(); + int runtime; + HashMap idMap; + String tableName; + + IDGenClient(String connectionStr, String base_dir, int time, String tableName) { + super(); + this.connectionStr = connectionStr; + this.base_dir = base_dir; + this.zkutil = new ZKUtil(connectionStr, base_dir); + this.runtime = time; + idMap = new HashMap(); + this.tableName = tableName; + } + + /* + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + long startTime = System.currentTimeMillis(); + int timeElapsed = 0; + while( timeElapsed <= runtime){ + try { + long id = zkutil.nextId(tableName); + idMap.put(System.currentTimeMillis(), id); + + int sTime = sleepTime.nextInt(2); + Thread.sleep(sTime * 100); + } catch (Exception e) { + e.printStackTrace(); + } + + timeElapsed = (int) Math.ceil((System.currentTimeMillis() - startTime)/(double)1000); + } + + } + + Map getIdMap(){ + return idMap; + } + +} Index: storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestIDGenerator.java =================================================================== --- storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestIDGenerator.java (revision 0) +++ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestIDGenerator.java (revision 0) @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase.snapshot; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; + +import org.apache.hcatalog.hbase.SkeletonHBaseTest; +import org.junit.Assert; +import org.junit.Test; + +public class TestIDGenerator extends SkeletonHBaseTest{ + + @Test + public void testIDGeneration() throws Exception { + + int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181); + String servers = getHbaseConf().get("hbase.zookeeper.quorum"); + String[] splits = servers.split(","); + StringBuffer sb = new StringBuffer(); + for(String split : splits){ + sb.append(split); + sb.append(':'); + sb.append(port); + } + ZKUtil zkutil = new ZKUtil(sb.toString(), "/rm_base"); + + String tableName = "myTable"; + long initId = zkutil.nextId(tableName); + for (int i=0; i<10; i++) { + long id = zkutil.nextId(tableName); + Assert.assertEquals(initId + (i + 1), id); + } + } + + @Test + public void testMultipleClients() throws InterruptedException{ + + int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181); + String servers = getHbaseConf().get("hbase.zookeeper.quorum"); + String[] splits = servers.split(","); + StringBuffer sb = new StringBuffer(); + for(String split : splits){ + sb.append(split); + sb.append(':'); + sb.append(port); + } + + ArrayList clients = new ArrayList(); + + for(int i =0; i < 5; i++){ + IDGenClient idClient = new IDGenClient(sb.toString(), "/rm_base", 10, "testTable"); + clients.add(idClient); + } + + for(IDGenClient idClient : clients){ + idClient.run(); + } + + for(IDGenClient idClient : clients){ + idClient.join(); + } + + HashMap idMap = new HashMap(); + for(IDGenClient idClient : clients){ + idMap.putAll(idClient.getIdMap()); + } + + ArrayList keys = new ArrayList(idMap.keySet()); + Collections.sort(keys); + int startId = 1; + for(Long key: keys){ + Long id = idMap.get(key); + System.out.println("Key: " + key + " Value "+ id); + assertTrue(id == startId); + startId++; + + } + } +} Index: storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java =================================================================== --- storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java (revision 0) +++ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java (revision 0) @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase.snapshot; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.hcatalog.hbase.SkeletonHBaseTest; +import org.apache.hcatalog.hbase.snapshot.transaction.thrift.*; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; +import org.junit.Test; + +public class TestRevisionManager extends SkeletonHBaseTest{ + + @Test + public void testBasicZNodeCreation() throws IOException, KeeperException, InterruptedException{ + + int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181); + String servers = getHbaseConf().get("hbase.zookeeper.quorum"); + String[] splits = servers.split(","); + StringBuffer sb = new StringBuffer(); + for(String split : splits){ + sb.append(split); + sb.append(':'); + sb.append(port); + } + + ZKUtil zkutil = new ZKUtil(sb.toString(), "/rm_base"); + String tableName = newTableName("testTable"); + List columnFamilies = Arrays.asList("cf001", "cf002", "cf003"); + + zkutil.createRootZNodes(); + ZooKeeper zk = zkutil.getSession(); + Stat tempTwo = zk.exists("/rm_base" + PathUtil.DATA_DIR, false); + assertTrue(tempTwo != null); + Stat tempThree = zk.exists("/rm_base" + PathUtil.CLOCK_NODE, false); + assertTrue(tempThree != null); + + zkutil.setUpZnodesForTable(tableName, columnFamilies); + String transactionDataTablePath = "/rm_base" + PathUtil.DATA_DIR + "/" + tableName; + Stat result = zk.exists(transactionDataTablePath, false); + assertTrue(result != null); + + for(String colFamiliy : columnFamilies){ + String cfPath = transactionDataTablePath + "/" + colFamiliy; + Stat resultTwo = zk.exists(cfPath, false); + assertTrue(resultTwo != null); + } + + } + + @Test + public void testCommitTransaction() throws IOException{ + + int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181); + String servers = getHbaseConf().get("hbase.zookeeper.quorum"); + String[] splits = servers.split(","); + StringBuffer sb = new StringBuffer(); + for(String split : splits){ + sb.append(split); + sb.append(':'); + sb.append(port); + } + + Properties props = new Properties(); + props.put(ZKBasedRevisionManager.HOSTLIST, sb.toString()); + props.put(ZKBasedRevisionManager.DATADIR, "/rm_base"); + ZKBasedRevisionManager manager = new ZKBasedRevisionManager(); + manager.initialize(props); + manager.open(); + ZKUtil zkutil = new ZKUtil(sb.toString(), "/rm_base"); + + String tableName = newTableName("testTable"); + List columnFamilies = Arrays.asList("cf1", "cf2", "cf3"); + Transaction txn = manager.beginWriteTransaction(tableName, + columnFamilies); + + List cfs = zkutil.getColumnFamiliesOfTable(tableName); + assertTrue(cfs.size() == columnFamilies.size()); + for (String cf : cfs){ + assertTrue(columnFamilies.contains(cf)); + } + + for(String colFamily : columnFamilies){ + String path = PathUtil.getRunningTxnInfoPath("/rm_base", tableName, colFamily); + byte[] data = zkutil.getRawData(path, null); + StoreFamilyRevisionList list = new StoreFamilyRevisionList(); + ZKUtil.deserialize(list, data); + assertEquals(list.getRevisionListSize(), 1); + StoreFamilyRevision lightTxn = list.getRevisionList().get(0); + assertEquals(lightTxn.timestamp, txn.getTransactionExpireTimeStamp()); + assertEquals(lightTxn.revision, txn.getRevisionNumber()); + + } + manager.commitWriteTransaction(txn); + for(String colFamiliy : columnFamilies){ + String path = PathUtil.getRunningTxnInfoPath("/rm_base", tableName, colFamiliy); + byte[] data = zkutil.getRawData(path, null); + StoreFamilyRevisionList list = new StoreFamilyRevisionList(); + ZKUtil.deserialize(list, data); + assertEquals(list.getRevisionListSize(), 0); + + } + + manager.close(); + } + + @Test + public void testAbortTransaction() throws IOException{ + + int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181); + String host = getHbaseConf().get("hbase.zookeeper.quorum"); + Properties props = new Properties(); + props.put(ZKBasedRevisionManager.HOSTLIST, host + ':' + port); + props.put(ZKBasedRevisionManager.DATADIR, "/rm_base"); + ZKBasedRevisionManager manager = new ZKBasedRevisionManager(); + manager.initialize(props); + manager.open(); + ZKUtil zkutil = new ZKUtil(host + ':' + port, "/rm_base"); + + String tableName = newTableName("testTable"); + List columnFamilies = Arrays.asList("cf1", "cf2", "cf3"); + Transaction txn = manager.beginWriteTransaction(tableName, columnFamilies); + List cfs = zkutil.getColumnFamiliesOfTable(tableName); + + assertTrue(cfs.size() == columnFamilies.size()); + for (String cf : cfs){ + assertTrue(columnFamilies.contains(cf)); + } + + for(String colFamiliy : columnFamilies){ + String path = PathUtil.getRunningTxnInfoPath("/rm_base",tableName, colFamiliy); + byte[] data = zkutil.getRawData(path, null); + StoreFamilyRevisionList list = new StoreFamilyRevisionList(); + ZKUtil.deserialize(list, data); + assertEquals(list.getRevisionListSize(), 1); + StoreFamilyRevision lightTxn = list.getRevisionList().get(0); + assertEquals(lightTxn.timestamp, txn.getTransactionExpireTimeStamp()); + assertEquals(lightTxn.revision, txn.getRevisionNumber()); + + } + manager.abortWriteTransaction(txn); + for(String colFamiliy : columnFamilies){ + String path = PathUtil.getRunningTxnInfoPath("/rm_base",tableName, colFamiliy); + byte[] data = zkutil.getRawData(path, null); + StoreFamilyRevisionList list = new StoreFamilyRevisionList(); + ZKUtil.deserialize(list, data); + assertEquals(list.getRevisionListSize(), 0); + + } + + for(String colFamiliy : columnFamilies){ + String path = PathUtil.getAbortInformationPath("/rm_base",tableName, colFamiliy); + byte[] data = zkutil.getRawData(path, null); + StoreFamilyRevisionList list = new StoreFamilyRevisionList(); + ZKUtil.deserialize(list, data); + assertEquals(list.getRevisionListSize(), 1); + StoreFamilyRevision abortedTxn = list.getRevisionList().get(0); + assertEquals(abortedTxn.getRevision(), txn.getRevisionNumber()); + } + manager.close(); + } + + @Test + public void testKeepAliveTransaction() throws InterruptedException, IOException { + + int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181); + String servers = getHbaseConf().get("hbase.zookeeper.quorum"); + String[] splits = servers.split(","); + StringBuffer sb = new StringBuffer(); + for(String split : splits){ + sb.append(split); + sb.append(':'); + sb.append(port); + } + + Properties props = new Properties(); + props.put(ZKBasedRevisionManager.HOSTLIST, sb.toString()); + props.put(ZKBasedRevisionManager.DATADIR, "/rm_base"); + ZKBasedRevisionManager manager = new ZKBasedRevisionManager(); + manager.initialize(props); + manager.open(); + String tableName = newTableName("testTable"); + List columnFamilies = Arrays.asList("cf1", "cf2"); + Transaction txn = manager.beginWriteTransaction(tableName, + columnFamilies, 40); + Thread.sleep(100); + try { + manager.commitWriteTransaction(txn); + } catch (Exception e) { + assertTrue(e instanceof IOException); + assertEquals(e.getMessage(), + "The transaction to be removed not found in the data."); + } + + } + + @Test + public void testCreateSnapshot() throws IOException{ + int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181); + String host = getHbaseConf().get("hbase.zookeeper.quorum"); + Properties props = new Properties(); + props.put(ZKBasedRevisionManager.HOSTLIST, host + ':' + port); + props.put(ZKBasedRevisionManager.DATADIR, "/rm_base"); + ZKBasedRevisionManager manager = new ZKBasedRevisionManager(); + manager.initialize(props); + manager.open(); + String tableName = newTableName("testTable"); + List cfOne = Arrays.asList("cf1", "cf2"); + List cfTwo = Arrays.asList("cf2", "cf3"); + Transaction tsx1 = manager.beginWriteTransaction(tableName, cfOne); + Transaction tsx2 = manager.beginWriteTransaction(tableName, cfTwo); + TableSnapshot snapshotOne = manager.createSnapshot(tableName); + assertEquals(snapshotOne.getRevision("cf1"), 0); + assertEquals(snapshotOne.getRevision("cf2"), 0); + assertEquals(snapshotOne.getRevision("cf3"), 1); + + List cfThree = Arrays.asList("cf1", "cf3"); + Transaction tsx3 = manager.beginWriteTransaction(tableName, cfThree); + manager.commitWriteTransaction(tsx1); + TableSnapshot snapshotTwo = manager.createSnapshot(tableName); + assertEquals(snapshotTwo.getRevision("cf1"), 2); + assertEquals(snapshotTwo.getRevision("cf2"), 1); + assertEquals(snapshotTwo.getRevision("cf3"), 1); + + manager.commitWriteTransaction(tsx2); + TableSnapshot snapshotThree = manager.createSnapshot(tableName); + assertEquals(snapshotThree.getRevision("cf1"), 2); + assertEquals(snapshotThree.getRevision("cf2"), 3); + assertEquals(snapshotThree.getRevision("cf3"), 2); + manager.commitWriteTransaction(tsx3); + TableSnapshot snapshotFour = manager.createSnapshot(tableName); + assertEquals(snapshotFour.getRevision("cf1"), 3); + assertEquals(snapshotFour.getRevision("cf2"), 3); + assertEquals(snapshotFour.getRevision("cf3"), 3); + + } + + +} Index: storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestThriftSerialization.java =================================================================== --- storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestThriftSerialization.java (revision 0) +++ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestThriftSerialization.java (revision 0) @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase.snapshot; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.apache.hcatalog.hbase.snapshot.transaction.thrift.*; +import org.junit.Test; + +public class TestThriftSerialization { + + @Test + public void testLightWeightTransaction(){ + StoreFamilyRevision trxn = new StoreFamilyRevision(0, 1000); + try { + + byte[] data = ZKUtil.serialize(trxn); + StoreFamilyRevision newWtx = new StoreFamilyRevision(); + ZKUtil.deserialize(newWtx, data); + + assertTrue(newWtx.getRevision() == trxn.getRevision()); + assertTrue(newWtx.getTimestamp() == trxn.getTimestamp()); + + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Test + public void testWriteTransactionList(){ + List txnList = new ArrayList(); + long version; + long timestamp; + for( int i = 0; i < 10; i++){ + version = i; + timestamp = 1000 + i; + StoreFamilyRevision wtx = new StoreFamilyRevision(version, timestamp); + txnList.add(wtx); + } + + StoreFamilyRevisionList wList = new StoreFamilyRevisionList(txnList); + + try { + byte[] data = ZKUtil.serialize(wList); + StoreFamilyRevisionList newList = new StoreFamilyRevisionList(); + ZKUtil.deserialize(newList, data); + assertTrue(newList.getRevisionListSize() == wList.getRevisionListSize()); + + Iterator itr = newList.getRevisionListIterator(); + int i = 0; + while(itr.hasNext()){ + StoreFamilyRevision txn = itr.next(); + assertTrue(txn.getRevision() == i); + assertTrue(txn.getTimestamp() == (i + 1000)); + i++; + } + + } catch (IOException e) { + e.printStackTrace(); + } + } + +}