Index: storage-drivers/hbase/build.xml
===================================================================
--- storage-drivers/hbase/build.xml (revision 1211608)
+++ storage-drivers/hbase/build.xml (working copy)
@@ -35,7 +35,6 @@
-
@@ -65,6 +64,8 @@
+
+
@@ -110,6 +111,8 @@
+
+
@@ -125,6 +128,31 @@
includes="hadoop-test-0.20.3-CDH3-SNAPSHOT.jar"/>
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
+
-
+
+
@@ -269,6 +300,27 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Tests failed!
@@ -325,8 +377,10 @@
+
+
Index: storage-drivers/hbase/ivy.xml
===================================================================
--- storage-drivers/hbase/ivy.xml (revision 1211608)
+++ storage-drivers/hbase/ivy.xml (working copy)
@@ -41,6 +41,11 @@
-
+
+
+
+
+
Index: storage-drivers/hbase/ivy/libraries.properties
===================================================================
--- storage-drivers/hbase/ivy/libraries.properties (revision 1211608)
+++ storage-drivers/hbase/ivy/libraries.properties (working copy)
@@ -17,4 +17,5 @@
ivy.version=2.2.0
rats-lib.version=0.5.1
hbase.version=0.90.3
-zookeeper.version=3.3.1
+zookeeper.version=3.4.0
+thrift.version=0.7.0
Index: storage-drivers/hbase/if/transaction.thrift
===================================================================
--- storage-drivers/hbase/if/transaction.thrift (revision 0)
+++ storage-drivers/hbase/if/transaction.thrift (revision 0)
@@ -0,0 +1,11 @@
+namespace java org.apache.hcatalog.hbase.snapshot.transaction.thrift
+namespace cpp Apache.HCatalog.HBase
+
+struct StoreFamilyRevision {
+ 1: i64 revision,
+ 2: i64 timestamp
+}
+
+struct StoreFamilyRevisionList {
+ 1: list revisionList
+}
Index: storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevision.java
===================================================================
--- storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevision.java (revision 0)
+++ storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevision.java (revision 0)
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This class is used to store the revision and timestamp of a column family
+ * in a transaction.
+ *
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.7.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package org.apache.hcatalog.hbase.snapshot.transaction.thrift;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StoreFamilyRevision implements org.apache.thrift.TBase, java.io.Serializable, Cloneable {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("StoreFamilyRevision");
+
+ private static final org.apache.thrift.protocol.TField REVISION_FIELD_DESC = new org.apache.thrift.protocol.TField("revision", org.apache.thrift.protocol.TType.I64, (short)1);
+ private static final org.apache.thrift.protocol.TField TIMESTAMP_FIELD_DESC = new org.apache.thrift.protocol.TField("timestamp", org.apache.thrift.protocol.TType.I64, (short)2);
+
+ public long revision; // required
+ public long timestamp; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ REVISION((short)1, "revision"),
+ TIMESTAMP((short)2, "timestamp");
+
+ private static final Map byName = new HashMap();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // REVISION
+ return REVISION;
+ case 2: // TIMESTAMP
+ return TIMESTAMP;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ private static final int __REVISION_ISSET_ID = 0;
+ private static final int __TIMESTAMP_ISSET_ID = 1;
+ private BitSet __isset_bit_vector = new BitSet(2);
+
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.REVISION, new org.apache.thrift.meta_data.FieldMetaData("revision", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+ tmpMap.put(_Fields.TIMESTAMP, new org.apache.thrift.meta_data.FieldMetaData("timestamp", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(StoreFamilyRevision.class, metaDataMap);
+ }
+
+ public StoreFamilyRevision() {
+ }
+
+ public StoreFamilyRevision(
+ long revision,
+ long timestamp)
+ {
+ this();
+ this.revision = revision;
+ setRevisionIsSet(true);
+ this.timestamp = timestamp;
+ setTimestampIsSet(true);
+ }
+
+ /**
+ * Performs a deep copy on other.
+ */
+ public StoreFamilyRevision(StoreFamilyRevision other) {
+ __isset_bit_vector.clear();
+ __isset_bit_vector.or(other.__isset_bit_vector);
+ this.revision = other.revision;
+ this.timestamp = other.timestamp;
+ }
+
+ public StoreFamilyRevision deepCopy() {
+ return new StoreFamilyRevision(this);
+ }
+
+ @Override
+ public void clear() {
+ setRevisionIsSet(false);
+ this.revision = 0;
+ setTimestampIsSet(false);
+ this.timestamp = 0;
+ }
+
+ public long getRevision() {
+ return this.revision;
+ }
+
+ public StoreFamilyRevision setRevision(long revision) {
+ this.revision = revision;
+ setRevisionIsSet(true);
+ return this;
+ }
+
+ public void unsetRevision() {
+ __isset_bit_vector.clear(__REVISION_ISSET_ID);
+ }
+
+ /** Returns true if field revision is set (has been assigned a value) and false otherwise */
+ public boolean isSetRevision() {
+ return __isset_bit_vector.get(__REVISION_ISSET_ID);
+ }
+
+ public void setRevisionIsSet(boolean value) {
+ __isset_bit_vector.set(__REVISION_ISSET_ID, value);
+ }
+
+ public long getTimestamp() {
+ return this.timestamp;
+ }
+
+ public StoreFamilyRevision setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ setTimestampIsSet(true);
+ return this;
+ }
+
+ public void unsetTimestamp() {
+ __isset_bit_vector.clear(__TIMESTAMP_ISSET_ID);
+ }
+
+ /** Returns true if field timestamp is set (has been assigned a value) and false otherwise */
+ public boolean isSetTimestamp() {
+ return __isset_bit_vector.get(__TIMESTAMP_ISSET_ID);
+ }
+
+ public void setTimestampIsSet(boolean value) {
+ __isset_bit_vector.set(__TIMESTAMP_ISSET_ID, value);
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case REVISION:
+ if (value == null) {
+ unsetRevision();
+ } else {
+ setRevision((Long)value);
+ }
+ break;
+
+ case TIMESTAMP:
+ if (value == null) {
+ unsetTimestamp();
+ } else {
+ setTimestamp((Long)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case REVISION:
+ return Long.valueOf(getRevision());
+
+ case TIMESTAMP:
+ return Long.valueOf(getTimestamp());
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case REVISION:
+ return isSetRevision();
+ case TIMESTAMP:
+ return isSetTimestamp();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof StoreFamilyRevision)
+ return this.equals((StoreFamilyRevision)that);
+ return false;
+ }
+
+ public boolean equals(StoreFamilyRevision that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_revision = true;
+ boolean that_present_revision = true;
+ if (this_present_revision || that_present_revision) {
+ if (!(this_present_revision && that_present_revision))
+ return false;
+ if (this.revision != that.revision)
+ return false;
+ }
+
+ boolean this_present_timestamp = true;
+ boolean that_present_timestamp = true;
+ if (this_present_timestamp || that_present_timestamp) {
+ if (!(this_present_timestamp && that_present_timestamp))
+ return false;
+ if (this.timestamp != that.timestamp)
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public int compareTo(StoreFamilyRevision other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ StoreFamilyRevision typedOther = (StoreFamilyRevision)other;
+
+ lastComparison = Boolean.valueOf(isSetRevision()).compareTo(typedOther.isSetRevision());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetRevision()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.revision, typedOther.revision);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetTimestamp()).compareTo(typedOther.isSetTimestamp());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetTimestamp()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.timestamp, typedOther.timestamp);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (field.id) {
+ case 1: // REVISION
+ if (field.type == org.apache.thrift.protocol.TType.I64) {
+ this.revision = iprot.readI64();
+ setRevisionIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 2: // TIMESTAMP
+ if (field.type == org.apache.thrift.protocol.TType.I64) {
+ this.timestamp = iprot.readI64();
+ setTimestampIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ oprot.writeFieldBegin(REVISION_FIELD_DESC);
+ oprot.writeI64(this.revision);
+ oprot.writeFieldEnd();
+ oprot.writeFieldBegin(TIMESTAMP_FIELD_DESC);
+ oprot.writeI64(this.timestamp);
+ oprot.writeFieldEnd();
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("StoreFamilyRevision(");
+ boolean first = true;
+
+ sb.append("revision:");
+ sb.append(this.revision);
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("timestamp:");
+ sb.append(this.timestamp);
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bit_vector = new BitSet(1);
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+}
+
Index: storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevisionList.java
===================================================================
--- storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevisionList.java (revision 0)
+++ storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevisionList.java (revision 0)
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This class is used to store a list of StoreFamilyRevision for a column
+ * family in zookeeper.
+ *
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.7.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package org.apache.hcatalog.hbase.snapshot.transaction.thrift;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class StoreFamilyRevisionList implements org.apache.thrift.TBase, java.io.Serializable, Cloneable {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("StoreFamilyRevisionList");
+
+ private static final org.apache.thrift.protocol.TField REVISION_LIST_FIELD_DESC = new org.apache.thrift.protocol.TField("revisionList", org.apache.thrift.protocol.TType.LIST, (short)1);
+
+ public List revisionList; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ REVISION_LIST((short)1, "revisionList");
+
+ private static final Map byName = new HashMap();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // REVISION_LIST
+ return REVISION_LIST;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.REVISION_LIST, new org.apache.thrift.meta_data.FieldMetaData("revisionList", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
+ new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, StoreFamilyRevision.class))));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(StoreFamilyRevisionList.class, metaDataMap);
+ }
+
+ public StoreFamilyRevisionList() {
+ }
+
+ public StoreFamilyRevisionList(
+ List revisionList)
+ {
+ this();
+ this.revisionList = revisionList;
+ }
+
+ /**
+ * Performs a deep copy on other.
+ */
+ public StoreFamilyRevisionList(StoreFamilyRevisionList other) {
+ if (other.isSetRevisionList()) {
+ List __this__revisionList = new ArrayList();
+ for (StoreFamilyRevision other_element : other.revisionList) {
+ __this__revisionList.add(new StoreFamilyRevision(other_element));
+ }
+ this.revisionList = __this__revisionList;
+ }
+ }
+
+ public StoreFamilyRevisionList deepCopy() {
+ return new StoreFamilyRevisionList(this);
+ }
+
+ @Override
+ public void clear() {
+ this.revisionList = null;
+ }
+
+ public int getRevisionListSize() {
+ return (this.revisionList == null) ? 0 : this.revisionList.size();
+ }
+
+ public java.util.Iterator getRevisionListIterator() {
+ return (this.revisionList == null) ? null : this.revisionList.iterator();
+ }
+
+ public void addToRevisionList(StoreFamilyRevision elem) {
+ if (this.revisionList == null) {
+ this.revisionList = new ArrayList();
+ }
+ this.revisionList.add(elem);
+ }
+
+ public List getRevisionList() {
+ return this.revisionList;
+ }
+
+ public StoreFamilyRevisionList setRevisionList(List revisionList) {
+ this.revisionList = revisionList;
+ return this;
+ }
+
+ public void unsetRevisionList() {
+ this.revisionList = null;
+ }
+
+ /** Returns true if field revisionList is set (has been assigned a value) and false otherwise */
+ public boolean isSetRevisionList() {
+ return this.revisionList != null;
+ }
+
+ public void setRevisionListIsSet(boolean value) {
+ if (!value) {
+ this.revisionList = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case REVISION_LIST:
+ if (value == null) {
+ unsetRevisionList();
+ } else {
+ setRevisionList((List)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case REVISION_LIST:
+ return getRevisionList();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case REVISION_LIST:
+ return isSetRevisionList();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof StoreFamilyRevisionList)
+ return this.equals((StoreFamilyRevisionList)that);
+ return false;
+ }
+
+ public boolean equals(StoreFamilyRevisionList that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_revisionList = true && this.isSetRevisionList();
+ boolean that_present_revisionList = true && that.isSetRevisionList();
+ if (this_present_revisionList || that_present_revisionList) {
+ if (!(this_present_revisionList && that_present_revisionList))
+ return false;
+ if (!this.revisionList.equals(that.revisionList))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public int compareTo(StoreFamilyRevisionList other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ StoreFamilyRevisionList typedOther = (StoreFamilyRevisionList)other;
+
+ lastComparison = Boolean.valueOf(isSetRevisionList()).compareTo(typedOther.isSetRevisionList());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetRevisionList()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.revisionList, typedOther.revisionList);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (field.id) {
+ case 1: // REVISION_LIST
+ if (field.type == org.apache.thrift.protocol.TType.LIST) {
+ {
+ org.apache.thrift.protocol.TList _list0 = iprot.readListBegin();
+ this.revisionList = new ArrayList(_list0.size);
+ for (int _i1 = 0; _i1 < _list0.size; ++_i1)
+ {
+ StoreFamilyRevision _elem2; // required
+ _elem2 = new StoreFamilyRevision();
+ _elem2.read(iprot);
+ this.revisionList.add(_elem2);
+ }
+ iprot.readListEnd();
+ }
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (this.revisionList != null) {
+ oprot.writeFieldBegin(REVISION_LIST_FIELD_DESC);
+ {
+ oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, this.revisionList.size()));
+ for (StoreFamilyRevision _iter3 : this.revisionList)
+ {
+ _iter3.write(oprot);
+ }
+ oprot.writeListEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("StoreFamilyRevisionList(");
+ boolean first = true;
+
+ sb.append("revisionList:");
+ if (this.revisionList == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.revisionList);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+}
+
Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java
===================================================================
--- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java (revision 0)
+++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java (revision 0)
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+
+/**
+ * A FamiliyRevision class consists of a revision number and a expiration
+ * timestamp. When a write transaction starts, the transaction
+ * object is appended to the transaction list of the each column
+ * family and stored in the corresponding znode. When a write transaction is
+ * committed, the transaction object is removed from the list.
+ */
+class FamilyRevision implements
+ Comparable {
+
+ private long revision;
+
+ private long timestamp;
+
+ /**
+ * Create a FamilyRevision object
+ * @param rev revision number
+ * @param ts expiration timestamp
+ */
+ FamilyRevision(long rev, long ts) {
+ this.revision = rev;
+ this.timestamp = ts;
+ }
+
+ long getRevision() {
+ return revision;
+ }
+
+ long getExpireTimestamp() {
+ return timestamp;
+ }
+
+ void setExpireTimestamp(long ts) {
+ timestamp = ts;
+ }
+
+ @Override
+ public String toString() {
+ String description = "revision: " + revision + " ts: " + timestamp;
+ return description;
+ }
+
+ @Override
+ public int compareTo(FamilyRevision o) {
+ long d = revision - o.getRevision();
+ return (d < 0) ? -1 : (d > 0) ? 1 : 0;
+ }
+
+
+}
Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/IDGenerator.java
===================================================================
--- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/IDGenerator.java (revision 0)
+++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/IDGenerator.java (revision 0)
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hcatalog.hbase.snapshot.lock.LockListener;
+import org.apache.hcatalog.hbase.snapshot.lock.WriteLock;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+
+/**
+ * This class generates revision id's for transactions.
+ */
+class IDGenerator implements LockListener{
+
+ private ZooKeeper zookeeper;
+ private String zNodeDataLoc;
+ private String zNodeLockBasePath;
+ private long id;
+ private static final Log LOG = LogFactory.getLog(IDGenerator.class);
+
+ IDGenerator(ZooKeeper zookeeper, String tableName, String idGenNode)
+ throws IOException {
+ this.zookeeper = zookeeper;
+ this.zNodeDataLoc = idGenNode;
+ this.zNodeLockBasePath = PathUtil.getLockManagementNode(idGenNode);
+ }
+
+ /**
+ * This method obtains a revision id for a transaction.
+ *
+ * @return revision ID
+ * @throws IOException
+ */
+ public long obtainID() throws IOException{
+ WriteLock wLock = new WriteLock(zookeeper, zNodeLockBasePath, Ids.OPEN_ACL_UNSAFE);
+ wLock.setLockListener(this);
+ try {
+ boolean lockGrabbed = wLock.lock();
+ if (lockGrabbed == false) {
+ //TO DO : Let this request queue up and try obtaining lock.
+ throw new IOException("Unable to obtain lock to obtain id.");
+ } else {
+ id = incrementAndReadCounter();
+ }
+ } catch (KeeperException e) {
+ LOG.warn("Exception while obtaining lock for ID.", e);
+ throw new IOException("Exception while obtaining lock for ID.", e);
+ } catch (InterruptedException e) {
+ LOG.warn("Exception while obtaining lock for ID.", e);
+ throw new IOException("Exception while obtaining lock for ID.", e);
+ } finally {
+ wLock.unlock();
+ }
+ return id;
+ }
+
+ /**
+ * This method reads the latest revision ID that has been used. The ID
+ * returned by this method cannot be used for transaction.
+ * @return revision ID
+ * @throws IOException
+ */
+ public long readID() throws IOException{
+ long curId;
+ try {
+ Stat stat = new Stat();
+ byte[] data = zookeeper.getData(this.zNodeDataLoc, false, stat);
+ curId = Long.parseLong(new String(data,Charset.forName("UTF-8")));
+ } catch (KeeperException e) {
+ LOG.warn("Exception while reading current revision id.", e);
+ throw new IOException("Exception while reading current revision id.", e);
+ } catch (InterruptedException e) {
+ LOG.warn("Exception while reading current revision id.", e);
+ throw new IOException("Exception while reading current revision id.",e);
+ }
+
+ return curId;
+ }
+
+
+ private long incrementAndReadCounter() throws IOException{
+
+ long curId, usedId;
+ try {
+ Stat stat = new Stat();
+ byte[] data = zookeeper.getData(this.zNodeDataLoc, false, stat);
+ usedId = Long.parseLong((new String(data,Charset.forName("UTF-8"))));
+ curId = usedId +1;
+ String lastUsedID = String.valueOf(curId);
+ zookeeper.setData(this.zNodeDataLoc, lastUsedID.getBytes(Charset.forName("UTF-8")), -1 );
+
+ } catch (KeeperException e) {
+ LOG.warn("Exception while incrementing revision id.", e);
+ throw new IOException("Exception while incrementing revision id. ", e);
+ } catch (InterruptedException e) {
+ LOG.warn("Exception while incrementing revision id.", e);
+ throw new IOException("Exception while incrementing revision id. ", e);
+ }
+
+ return curId;
+ }
+
+ /*
+ * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockAcquired()
+ */
+ @Override
+ public void lockAcquired() {
+
+
+ }
+
+ /*
+ * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockReleased()
+ */
+ @Override
+ public void lockReleased() {
+
+ }
+
+
+}
Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/PathUtil.java
===================================================================
--- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/PathUtil.java (revision 0)
+++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/PathUtil.java (revision 0)
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+
+/**
+ * The PathUtil class is a utility class to provide information about various
+ * znode paths. The following is the znode structure used for storing information.
+ * baseDir/ClockNode
+ * baseDir/TrasactionBasePath
+ * baseDir/TrasactionBasePath/TableA/revisionID
+ * baseDir/TrasactionBasePath/TableA/columnFamily-1
+ * baseDir/TrasactionBasePath/TableA/columnFamily-1/runnningTxns
+ * baseDir/TrasactionBasePath/TableA/columnFamily-1/abortedTxns
+ * baseDir/TrasactionBasePath/TableB/revisionID
+ * baseDir/TrasactionBasePath/TableB/columnFamily-1
+ * baseDir/TrasactionBasePath/TableB/columnFamily-1/runnningTxns
+ * baseDir/TrasactionBasePath/TableB/columnFamily-1/abortedTxns
+
+ */
+public class PathUtil{
+
+ static final String DATA_DIR = "/data";
+ static final String CLOCK_NODE = "/clock";
+
+ /**
+ * This method returns the data path associated with the currently
+ * running transactions of a given table and column/column family.
+ * @param baseDir
+ * @param tableName
+ * @param columnFamily
+ * @return The path of the running transactions data.
+ */
+ static String getRunningTxnInfoPath(String baseDir, String tableName,
+ String columnFamily) {
+ String txnBasePath = getTransactionBasePath(baseDir);
+ String path = txnBasePath + "/" + tableName + "/" + columnFamily
+ + "/runningTxns";
+ return path;
+ }
+
+ /**
+ * This method returns the data path associated with the aborted
+ * transactions of a given table and column/column family.
+ * @param baseDir The base directory for revision management.
+ * @param tableName The name of the table.
+ * @param columnFamily
+ * @return The path of the aborted transactions data.
+ */
+ static String getAbortInformationPath(String baseDir, String tableName,
+ String columnFamily) {
+ String txnBasePath = getTransactionBasePath(baseDir);
+ String path = txnBasePath + "/" + tableName + "/" + columnFamily
+ + "/abortData";
+ return path;
+ }
+
+ /**
+ * Gets the revision id node for a given table.
+ *
+ * @param baseDir the base dir for revision management.
+ * @param tableName the table name
+ * @return the revision id node path.
+ */
+ static String getRevisionIDNode(String baseDir, String tableName) {
+ String rmBasePath = getTransactionBasePath(baseDir);
+ String revisionIDNode = rmBasePath + "/" + tableName + "/idgen";
+ return revisionIDNode;
+ }
+
+ /**
+ * Gets the lock management node for any znode that needs to be locked.
+ *
+ * @param path the path of the znode.
+ * @return the lock management node path.
+ */
+ static String getLockManagementNode(String path) {
+ String lockNode = path + "_locknode_";
+ return lockNode;
+ }
+
+ /**
+ * This method returns the base path for the transaction data.
+ *
+ * @param baseDir The base dir for revision management.
+ * @return The base path for the transaction data.
+ */
+ static String getTransactionBasePath(String baseDir) {
+ String txnBaseNode = baseDir + DATA_DIR;
+ return txnBaseNode;
+ }
+
+ /**
+ * Gets the txn data path for a given table.
+ *
+ * @param baseDir the base dir for revision management.
+ * @param tableName the table name
+ * @return the txn data path for the table.
+ */
+ static String getTxnDataPath(String baseDir, String tableName){
+ String txnBasePath = getTransactionBasePath(baseDir);
+ String path = txnBasePath + "/" + tableName;
+ return path;
+ }
+
+ /**
+ * This method returns the data path for clock node.
+ *
+ * @param baseDir
+ * @return The data path for clock.
+ */
+ static String getClockPath(String baseDir) {
+ String clockNode = baseDir + CLOCK_NODE;
+ return clockNode;
+ }
+}
Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java
===================================================================
--- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java (revision 0)
+++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java (revision 0)
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * This interface provides APIs for implementing revision management.
+ */
+public interface RevisionManager {
+
+ public static final String REVISION_MGR_IMPL_CLASS = "revision.manager.impl.class";
+
+ /**
+ * Initialize the revision manager.
+ */
+ public void initialize(Properties properties);
+
+ /**
+ * Opens the revision manager.
+ *
+ * @throws IOException
+ */
+ public void open() throws IOException;
+
+ /**
+ * Closes the revision manager.
+ *
+ * @throws IOException
+ */
+ public void close() throws IOException;
+
+ /**
+ * Start the write transaction.
+ *
+ * @param table
+ * @param families
+ * @return
+ * @throws IOException
+ */
+ public Transaction beginWriteTransaction(String table, List families)
+ throws IOException;
+
+ /**
+ * Start the write transaction.
+ *
+ * @param table
+ * @param families
+ * @param keepAlive
+ * @return
+ * @throws IOException
+ */
+ public Transaction beginWriteTransaction(String table,
+ List families, long keepAlive) throws IOException;
+
+ /**
+ * Commit the write transaction.
+ *
+ * @param transaction
+ * @throws IOException
+ */
+ public void commitWriteTransaction(Transaction transaction)
+ throws IOException;
+
+ /**
+ * Abort the write transaction.
+ *
+ * @param transaction
+ * @throws IOException
+ */
+ public void abortWriteTransaction(Transaction transaction)
+ throws IOException;
+
+ /**
+ * Create the latest snapshot of the table.
+ *
+ * @param tableName
+ * @return
+ * @throws IOException
+ */
+ public TableSnapshot createSnapshot(String tableName) throws IOException;
+
+ /**
+ * Create the snapshot of the table using the revision number.
+ *
+ * @param tableName
+ * @param revision
+ * @return
+ * @throws IOException
+ */
+ public TableSnapshot createSnapshot(String tableName, long revision)
+ throws IOException;
+
+ /**
+ * Extends the expiration of a transaction by the time indicated by keep alive.
+ *
+ * @param transaction
+ * @throws IOException
+ */
+ public void keepAlive(Transaction transaction) throws IOException;
+
+}
Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java
===================================================================
--- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java (revision 0)
+++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java (revision 0)
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public class RevisionManagerFactory {
+
+ /**
+ * Gets an instance of revision manager.
+ *
+ * @param properties The properties required to created the revision manager.
+ * @return the revision manager An instance of revision manager.
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ public static RevisionManager getRevisionManager(Properties properties) throws IOException{
+
+ RevisionManager revisionMgr;
+ ClassLoader classLoader = Thread.currentThread()
+ .getContextClassLoader();
+ if (classLoader == null) {
+ classLoader = RevisionManagerFactory.class.getClassLoader();
+ }
+ String className = properties.getProperty(
+ RevisionManager.REVISION_MGR_IMPL_CLASS,
+ ZKBasedRevisionManager.class.getName());
+ try {
+
+ @SuppressWarnings("unchecked")
+ Class extends RevisionManager> revisionMgrClass = (Class extends RevisionManager>) Class
+ .forName(className, true , classLoader);
+ revisionMgr = (RevisionManager) revisionMgrClass.newInstance();
+ revisionMgr.initialize(properties);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(
+ "The implementation class of revision manager not found.",
+ e);
+ } catch (InstantiationException e) {
+ throw new IOException(
+ "Exception encountered during instantiating revision manager implementation.",
+ e);
+ } catch (IllegalAccessException e) {
+ throw new IOException(
+ "IllegalAccessException encountered during instantiating revision manager implementation.",
+ e);
+ } catch (IllegalArgumentException e) {
+ throw new IOException(
+ "IllegalArgumentException encountered during instantiating revision manager implementation.",
+ e);
+ }
+ return revisionMgr;
+ }
+
+}
Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java
===================================================================
--- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java (revision 0)
+++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java (revision 0)
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The snapshot for a table and a list of column families.
+ */
+public class TableSnapshot {
+
+ private String name;
+
+ private Map cfRevisionMap;
+
+
+ public TableSnapshot(String name, Map cfRevMap) {
+ this.name = name;
+ this.cfRevisionMap = cfRevMap;
+ }
+
+ /**
+ * Gets the table name.
+ *
+ * @return String The name of the table.
+ */
+ public String getTableName() {
+ return name;
+ }
+
+ /**
+ * Gets the column families.
+ *
+ * @return List A list of column families associated with the snapshot.
+ */
+ public List getColumnFamilies(){
+ return new ArrayList(this.cfRevisionMap.keySet());
+ }
+
+ /**
+ * Gets the revision.
+ *
+ * @param familyName The name of the column family.
+ * @return the revision
+ */
+ public long getRevision(String familyName){
+ return this.cfRevisionMap.get(familyName);
+ }
+
+ @Override
+ public String toString() {
+ String snapshot = "Table Name : " + name
+ + " Column Familiy revision : " + cfRevisionMap.toString();
+ return snapshot;
+ }
+}
Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java
===================================================================
--- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java (revision 0)
+++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java (revision 0)
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class is responsible for storing information related to
+ * transactions.
+ */
+public class Transaction implements Serializable {
+
+ private String tableName;
+ private List columnFamilies = new ArrayList();
+ private long timeStamp;
+ private long keepAlive;
+ private long revision;
+
+
+ Transaction(String tableName, List columnFamilies, long revision, long timestamp) {
+ this.tableName = tableName;
+ this.columnFamilies = columnFamilies;
+ this.timeStamp = timestamp;
+ this.revision = revision;
+ }
+
+ /**
+ * @return The revision number associated with a transaction.
+ */
+ public long getRevisionNumber(){
+ return this.revision;
+ }
+
+ /**
+ * @return The table name associated with a transaction.
+ */
+ public String getTableName() {
+ return tableName;
+ }
+
+ /**
+ * @return The column families associated with a transaction.
+ */
+ public List getColumnFamilies() {
+ return columnFamilies;
+ }
+
+ /**
+ * @return The expire timestamp associated with a transaction.
+ */
+ long getTransactionExpireTimeStamp(){
+ return this.timeStamp + this.keepAlive;
+ }
+
+ void setKeepAlive(long seconds){
+ this.keepAlive = seconds;
+ }
+
+ /**
+ * Gets the keep alive value.
+ *
+ * @return long The keep alive value for the transaction.
+ */
+ public long getKeepAliveValue(){
+ return this.keepAlive;
+ }
+
+ /**
+ * Gets the family revision info.
+ *
+ * @return FamilyRevision An instance of FamilyRevision associated with the transaction.
+ */
+ FamilyRevision getFamilyRevisionInfo(){
+ return new FamilyRevision(revision, getTransactionExpireTimeStamp());
+ }
+
+ /**
+ * Keep alive transaction. This methods extends the expire timestamp of a
+ * transaction by the "keep alive" amount.
+ */
+ void keepAliveTransaction(){
+ this.timeStamp = this.timeStamp + this.keepAlive;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Revision : ");
+ sb.append(this.getRevisionNumber());
+ sb.append(" Timestamp : ");
+ sb.append(this.getTransactionExpireTimeStamp());
+ sb.append("\n").append("Table : ");
+ sb.append(this.tableName).append("\n");
+ sb.append("Column Families : ");
+ sb.append(this.columnFamilies.toString());
+ return sb.toString();
+ }
+}
Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java
===================================================================
--- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java (revision 0)
+++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java (revision 0)
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hcatalog.hbase.snapshot.lock.LockListener;
+import org.apache.hcatalog.hbase.snapshot.lock.WriteLock;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+
+/**
+ * The service for providing revision management to Hbase tables.
+ */
+public class ZKBasedRevisionManager implements RevisionManager{
+
+ public static final String HOSTLIST = "revision.manager.zk.HostList";
+ public static final String DATADIR = "revision.manager.zk.DataDir";
+ private static int DEFAULT_WRITE_TRANSACTION_TIMEOUT = 14400000;
+ private static final Log LOG = LogFactory.getLog(ZKBasedRevisionManager.class);
+ private String zkHostList;
+ private String baseDir;
+ private ZKUtil zkUtil;
+
+
+ /*
+ * @see org.apache.hcatalog.hbase.snapshot.RevisionManager#initialize()
+ */
+ @Override
+ public void initialize(Properties properties) {
+ this.zkHostList = properties.getProperty(ZKBasedRevisionManager.HOSTLIST, "localhost:2181");
+ this.baseDir = properties.getProperty(ZKBasedRevisionManager.DATADIR,"/revision-management");
+ }
+
+ /**
+ * Open a ZooKeeper connection
+ * @throws java.io.IOException
+ */
+
+ public void open() throws IOException {
+ zkUtil = new ZKUtil(zkHostList, this.baseDir);
+ zkUtil.createRootZNodes();
+ LOG.info("Created root znodes for revision manager.");
+ }
+
+ /**
+ * Close Zookeeper connection
+ */
+ public void close() {
+ zkUtil.closeZKConnection();
+ }
+
+ private void checkInputParams(String table, List families) {
+ if (table == null) {
+ throw new IllegalArgumentException(
+ "The table name must be specified for reading.");
+ }
+ if (families == null || families.isEmpty()) {
+ throw new IllegalArgumentException(
+ "At least one column family should be specified for reading.");
+ }
+ }
+
+
+ /* @param table
+ /* @param families
+ /* @param keepAlive
+ /* @return
+ /* @throws IOException
+ * @see org.apache.hcatalog.hbase.snapshot.RevisionManager#beginWriteTransaction(java.lang.String, java.util.List, long)
+ */
+ public Transaction beginWriteTransaction(String table,
+ List families, long keepAlive) throws IOException {
+
+ checkInputParams(table, families);
+ zkUtil.setUpZnodesForTable(table, families);
+ long nextId = zkUtil.nextId(table);
+ long expireTimestamp = zkUtil.getTimeStamp();
+ Transaction transaction = new Transaction(table, families, nextId,
+ expireTimestamp);
+ if (keepAlive != -1) {
+ transaction.setKeepAlive(keepAlive);
+ } else {
+ transaction.setKeepAlive(DEFAULT_WRITE_TRANSACTION_TIMEOUT);
+ }
+
+ refreshTransactionList(transaction.getTableName());
+ String lockPath = prepareLockNode(table);
+ WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
+ Ids.OPEN_ACL_UNSAFE);
+ RMLockListener myLockListener = new RMLockListener();
+ wLock.setLockListener(myLockListener);
+ try {
+ boolean lockGrabbed = wLock.lock();
+ if (lockGrabbed == false) {
+ //TO DO : Let this request queue up and try obtaining lock.
+ throw new IOException(
+ "Unable to obtain lock while beginning transaction. "
+ + transaction.toString());
+ } else {
+ List colFamilies = transaction.getColumnFamilies();
+ FamilyRevision revisionData = transaction.getFamilyRevisionInfo();
+ for (String cfamily : colFamilies) {
+ String path = PathUtil.getRunningTxnInfoPath(
+ baseDir, table, cfamily);
+ zkUtil.updateData(path, revisionData,
+ ZKUtil.UpdateMode.APPEND);
+ }
+ }
+ } catch (KeeperException e) {
+ throw new IOException("Exception while obtaining lock.", e);
+ } catch (InterruptedException e) {
+ throw new IOException("Exception while obtaining lock.", e);
+ }
+ finally {
+ wLock.unlock();
+ }
+
+ return transaction;
+ }
+
+ /* @param table The table name.
+ /* @param families The column families involved in the transaction.
+ /* @return transaction The transaction which was started.
+ /* @throws IOException
+ * @see org.apache.hcatalog.hbase.snapshot.RevisionManager#beginWriteTransaction(java.lang.String, java.util.List)
+ */
+ public Transaction beginWriteTransaction(String table, List families)
+ throws IOException {
+ return beginWriteTransaction(table, families, -1);
+ }
+
+ /**
+ * This method commits a write transaction.
+ * @param transaction The revision information associated with transaction.
+ * @throws java.io.IOException
+ */
+ public void commitWriteTransaction(Transaction transaction) throws IOException {
+ refreshTransactionList(transaction.getTableName());
+
+ String lockPath = prepareLockNode(transaction.getTableName());
+ WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
+ Ids.OPEN_ACL_UNSAFE);
+ RMLockListener myLockListener = new RMLockListener();
+ wLock.setLockListener(myLockListener);
+ try {
+ boolean lockGrabbed = wLock.lock();
+ if (lockGrabbed == false) {
+ //TO DO : Let this request queue up and try obtaining lock.
+ throw new IOException(
+ "Unable to obtain lock while commiting transaction. "
+ + transaction.toString());
+ } else {
+ String tableName = transaction.getTableName();
+ List colFamilies = transaction.getColumnFamilies();
+ FamilyRevision revisionData = transaction.getFamilyRevisionInfo();
+ for (String cfamily : colFamilies) {
+ String path = PathUtil.getRunningTxnInfoPath(
+ baseDir, tableName, cfamily);
+ zkUtil.updateData(path, revisionData,
+ ZKUtil.UpdateMode.REMOVE);
+ }
+
+ }
+ } catch (KeeperException e) {
+ throw new IOException("Exception while obtaining lock.", e);
+ } catch (InterruptedException e) {
+ throw new IOException("Exception while obtaining lock.", e);
+ }
+ finally {
+ wLock.unlock();
+ }
+ LOG.info("Write Transaction committed: " + transaction.toString());
+ }
+
+ /**
+ * This method aborts a write transaction.
+ * @param state the state associated with the Transaction
+ * @throws java.io.IOException
+ */
+ public void abortWriteTransaction(Transaction transaction) throws IOException {
+
+ refreshTransactionList(transaction.getTableName());
+ String lockPath = prepareLockNode(transaction.getTableName());
+ WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
+ Ids.OPEN_ACL_UNSAFE);
+ RMLockListener myLockListener = new RMLockListener();
+ wLock.setLockListener(myLockListener);
+ try {
+ boolean lockGrabbed = wLock.lock();
+ if (lockGrabbed == false) {
+ //TO DO : Let this request queue up and try obtaining lock.
+ throw new IOException(
+ "Unable to obtain lock while aborting transaction. "
+ + transaction.toString());
+ } else {
+ String tableName = transaction.getTableName();
+ List colFamilies = transaction.getColumnFamilies();
+ FamilyRevision revisionData = transaction
+ .getFamilyRevisionInfo();
+ for (String cfamily : colFamilies) {
+ String path = PathUtil.getRunningTxnInfoPath(
+ baseDir, tableName, cfamily);
+ zkUtil.updateData(path, revisionData,
+ ZKUtil.UpdateMode.REMOVE);
+ path = PathUtil.getAbortInformationPath(baseDir,
+ tableName, cfamily);
+ zkUtil.updateData(path, revisionData,
+ ZKUtil.UpdateMode.APPEND);
+ }
+
+ }
+ } catch (KeeperException e) {
+ throw new IOException("Exception while obtaining lock.", e);
+ } catch (InterruptedException e) {
+ throw new IOException("Exception while obtaining lock.", e);
+ }
+ finally {
+ wLock.unlock();
+ }
+ LOG.info("Write Transaction aborted: " + transaction.toString());
+ }
+
+
+ /* @param transaction
+ /* @throws IOException
+ * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#keepAlive(org.apache.hcatalog.hbase.snapshot.Transaction)
+ */
+ public void keepAlive(Transaction transaction)
+ throws IOException {
+
+ refreshTransactionList(transaction.getTableName());
+ transaction.keepAliveTransaction();
+ String lockPath = prepareLockNode(transaction.getTableName());
+ WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
+ Ids.OPEN_ACL_UNSAFE);
+ RMLockListener myLockListener = new RMLockListener();
+ wLock.setLockListener(myLockListener);
+ try {
+ boolean lockGrabbed = wLock.lock();
+ if (lockGrabbed == false) {
+ //TO DO : Let this request queue up and try obtaining lock.
+ throw new IOException(
+ "Unable to obtain lock for keep alive of transaction. "
+ + transaction.toString());
+ }else {
+ String tableName = transaction.getTableName();
+ List colFamilies = transaction.getColumnFamilies();
+ FamilyRevision revisionData = transaction.getFamilyRevisionInfo();
+ for (String cfamily : colFamilies) {
+ String path = PathUtil.getRunningTxnInfoPath(
+ baseDir, tableName, cfamily);
+ zkUtil.updateData(path, revisionData,
+ ZKUtil.UpdateMode.KEEP_ALIVE);
+ }
+
+ }
+ } catch (KeeperException e) {
+ throw new IOException("Exception while obtaining lock.", e);
+ } catch (InterruptedException e) {
+ throw new IOException("Exception while obtaining lock.", e);
+ }finally {
+ wLock.unlock();
+ }
+
+ }
+
+ /* This method allows the user to create latest snapshot of a
+ /* table.
+ /* @param tableName The table whose snapshot is being created.
+ /* @return TableSnapshot An instance of TableSnaphot
+ /* @throws IOException
+ * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#createSnapshot(java.lang.String)
+ */
+ public TableSnapshot createSnapshot(String tableName) throws IOException{
+ refreshTransactionList(tableName);
+ long latestID = zkUtil.currentID(tableName);
+ HashMap cfMap = new HashMap();
+ List columnFamilyNames = zkUtil.getColumnFamiliesOfTable(tableName);
+
+ for(String cfName: columnFamilyNames){
+ String cfPath = PathUtil.getRunningTxnInfoPath(baseDir, tableName, cfName);
+ List tranxList = zkUtil.getTransactionList(cfPath);
+ long version;
+ if (!tranxList.isEmpty()) {
+ Collections.sort(tranxList);
+ // get the smallest running Transaction ID
+ long runningVersion = tranxList.get(0).getRevision();
+ version = runningVersion -1;
+ } else {
+ version = latestID;
+ }
+ cfMap.put(cfName, version);
+ }
+
+ return new TableSnapshot(tableName, cfMap);
+ }
+
+ /* This method allows the user to create snapshot of a
+ /* table with a given revision number.
+ /* @param tableName
+ /* @param revision
+ /* @return TableSnapshot
+ /* @throws IOException
+ * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#createSnapshot(java.lang.String, long)
+ */
+ public TableSnapshot createSnapshot(String tableName, long revision) throws IOException{
+
+ long currentID = zkUtil.currentID(tableName);
+ if (revision > currentID) {
+ throw new IOException(
+ "The revision specified in the snapshot is higher than the current revision of the table.");
+ }
+ refreshTransactionList(tableName);
+ HashMap cfMap = new HashMap();
+ List columnFamilies = zkUtil.getColumnFamiliesOfTable(tableName);
+
+ for(String cf: columnFamilies){
+ cfMap.put(cf, revision);
+ }
+
+ return new TableSnapshot(tableName, cfMap);
+ }
+
+ /**
+ * Get the list of in-progress Transactions for a column family
+ * @param table the table name
+ * @param columnFamily the column family name
+ * @return a list of in-progress WriteTransactions
+ * @throws java.io.IOException
+ */
+ List getRunningTransactions(String table,
+ String columnFamily) throws IOException {
+ String path = PathUtil.getRunningTxnInfoPath(baseDir, table,
+ columnFamily);
+ return zkUtil.getTransactionList(path);
+ }
+
+ /**
+ * Get the list of aborted Transactions for a column family
+ * @param table the table name
+ * @param columnFamily the column family name
+ * @return a list of aborted WriteTransactions
+ * @throws java.io.IOException
+ */
+ List getAbortedWriteTransactions(String table,
+ String columnFamily) throws IOException {
+ String path = PathUtil.getAbortInformationPath(baseDir, table, columnFamily);
+ return zkUtil.getTransactionList(path);
+ }
+
+ private void refreshTransactionList(String tableName) throws IOException{
+ String lockPath = prepareLockNode(tableName);
+ WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
+ Ids.OPEN_ACL_UNSAFE);
+ RMLockListener myLockListener = new RMLockListener();
+ wLock.setLockListener(myLockListener);
+ try {
+ boolean lockGrabbed = wLock.lock();
+ if (lockGrabbed == false) {
+ //TO DO : Let this request queue up and try obtaining lock.
+ throw new IOException(
+ "Unable to obtain lock while refreshing transactions of table "
+ + tableName + ".");
+ }else {
+ List cfPaths = zkUtil
+ .getColumnFamiliesOfTable(tableName);
+ for (String cf : cfPaths) {
+ String runningDataPath = PathUtil.getRunningTxnInfoPath(
+ baseDir, tableName, cf);
+ zkUtil.refreshTransactions(runningDataPath);
+ }
+
+ }
+ } catch (KeeperException e) {
+ throw new IOException("Exception while obtaining lock.", e);
+ } catch (InterruptedException e) {
+ throw new IOException("Exception while obtaining lock.", e);
+ } finally {
+ wLock.unlock();
+ }
+
+ }
+
+ private String prepareLockNode(String tableName) throws IOException{
+ String txnDataPath = PathUtil.getTxnDataPath(this.baseDir, tableName);
+ String lockPath = PathUtil.getLockManagementNode(txnDataPath);
+ zkUtil.ensurePathExists(lockPath, null, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ return lockPath;
+ }
+
+
+ /*
+ * This class is a listener class for the locks used in revision management.
+ * TBD: Use the following class to signal that that the lock is actually
+ * been granted.
+ */
+ class RMLockListener implements LockListener {
+
+ /*
+ * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockAcquired()
+ */
+ @Override
+ public void lockAcquired() {
+
+ }
+
+ /*
+ * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockReleased()
+ */
+ @Override
+ public void lockReleased() {
+
+ }
+
+ }
+
+
+}
Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java
===================================================================
--- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java (revision 0)
+++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java (revision 0)
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hcatalog.hbase.snapshot.transaction.thrift.*;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+class ZKUtil {
+
+ private int DEFAULT_SESSION_TIMEOUT = 1000000;
+ private ZooKeeper zkSession;
+ private String baseDir;
+ private String connectString;
+ private static final Log LOG = LogFactory.getLog(ZKUtil.class);
+
+ static enum UpdateMode {
+ APPEND, REMOVE, KEEP_ALIVE
+ };
+
+ ZKUtil(String connection, String baseDir) {
+ this.connectString = connection;
+ this.baseDir = baseDir;
+ }
+
+ /**
+ * This method creates znodes related to table.
+ *
+ * @param table The name of the table.
+ * @param families The list of column families of the table.
+ * @throws IOException
+ */
+ void setUpZnodesForTable(String table, List families)
+ throws IOException {
+
+ String transactionDataTablePath = PathUtil.getTxnDataPath(baseDir, table);
+ ensurePathExists(transactionDataTablePath, null, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ for (String cf : families) {
+ String runningDataPath = PathUtil.getRunningTxnInfoPath(
+ this.baseDir, table, cf);
+ ensurePathExists(runningDataPath, null, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ String abortDataPath = PathUtil.getAbortInformationPath(
+ this.baseDir, table, cf);
+ ensurePathExists(abortDataPath, null, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ }
+
+ }
+
+ /**
+ * This method ensures that a given path exists in zookeeper. If the path
+ * does not exists, it creates one.
+ *
+ * @param path The path of znode that is required to exist.
+ * @param data The data to be associated with the znode.
+ * @param acl The ACLs required.
+ * @param flags The CreateMode for the znode.
+ * @throws IOException
+ */
+ void ensurePathExists(String path, byte[] data, List acl,
+ CreateMode flags) throws IOException {
+ String[] dirs = path.split("/");
+ String parentPath = "";
+ for (String subDir : dirs) {
+ if (subDir.equals("") == false) {
+ parentPath = parentPath + "/" + subDir;
+ try {
+ Stat stat = getSession().exists(parentPath, false);
+ if (stat == null) {
+ getSession().create(parentPath, data, acl, flags);
+ }
+ } catch (Exception e) {
+ throw new IOException("Exception while creating path "
+ + parentPath, e);
+ }
+ }
+ }
+
+ }
+
+ /**
+ * This method returns a list of columns of a table which were used in any
+ * of the transactions.
+ *
+ * @param tableName The name of table.
+ * @return List The list of column families in table.
+ * @throws IOException
+ */
+ List getColumnFamiliesOfTable(String tableName) throws IOException {
+ String path = PathUtil.getTxnDataPath(baseDir, tableName);
+ List children = null;
+ List columnFamlies = new ArrayList();
+ try {
+ children = getSession().getChildren(path, false);
+ } catch (KeeperException e) {
+ LOG.warn("Caught: ", e);
+ throw new IOException("Exception while obtaining columns of table.",e);
+ } catch (InterruptedException e) {
+ LOG.warn("Caught: ", e);
+ throw new IOException("Exception while obtaining columns of table.",e);
+ }
+
+ for (String child : children) {
+ if ((child.contains("idgen") == false)
+ && (child.contains("_locknode_") == false)) {
+ columnFamlies.add(child);
+ }
+ }
+ return columnFamlies;
+ }
+
+ /**
+ * This method returns a time stamp for use by the transactions.
+ *
+ * @return long The current timestamp in zookeeper.
+ * @throws IOException
+ */
+ long getTimeStamp() throws IOException {
+ long timeStamp;
+ Stat stat;
+ String clockPath = PathUtil.getClockPath(this.baseDir);
+ ensurePathExists(clockPath, null, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ try {
+ getSession().exists(clockPath, false);
+ stat = getSession().setData(clockPath, null, -1);
+
+ } catch (KeeperException e) {
+ LOG.warn("Caught: ", e);
+ throw new IOException("Exception while obtaining timestamp ", e);
+ } catch (InterruptedException e) {
+ LOG.warn("Caught: ", e);
+ throw new IOException("Exception while obtaining timestamp ", e);
+ }
+ timeStamp = stat.getMtime();
+ return timeStamp;
+ }
+
+ /**
+ * This method returns the next revision number to be used for any
+ * transaction purposes.
+ *
+ * @param tableName The name of the table.
+ * @return revision number The revision number last used by any transaction.
+ * @throws IOException
+ */
+ long nextId(String tableName) throws IOException {
+ String idNode = PathUtil.getRevisionIDNode(this.baseDir, tableName);
+ ensurePathExists(idNode, Bytes.toBytes("0"), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ String lockNode = PathUtil.getLockManagementNode(idNode);
+ ensurePathExists(lockNode, null, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ IDGenerator idf = new IDGenerator(getSession(), tableName, idNode);
+ long id = idf.obtainID();
+ return id;
+ }
+
+ /**
+ * The latest used revision id of the table.
+ *
+ * @param tableName The name of the table.
+ * @return the long The revision number to use by any transaction.
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ long currentID(String tableName) throws IOException{
+ String idNode = PathUtil.getRevisionIDNode(this.baseDir, tableName);
+ ensurePathExists(idNode, Bytes.toBytes("0"), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ String lockNode = PathUtil.getLockManagementNode(idNode);
+ ensurePathExists(lockNode, null, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ IDGenerator idf = new IDGenerator(getSession(), tableName, idNode);
+ long id = idf.readID();
+ return id;
+ }
+
+ /**
+ * This methods retrieves the list of transaction information associated
+ * with each column/column family of a table.
+ *
+ * @param path The znode path
+ * @return List of FamilyRevision The list of transactions in the given path.
+ * @throws IOException
+ */
+ List getTransactionList(String path)
+ throws IOException {
+
+ byte[] data = getRawData(path, new Stat());
+ ArrayList wtxnList = new ArrayList();
+ if (data == null) {
+ return wtxnList;
+ }
+ StoreFamilyRevisionList txnList = new StoreFamilyRevisionList();
+ deserialize(txnList, data);
+ Iterator itr = txnList.getRevisionListIterator();
+
+ while (itr.hasNext()) {
+ StoreFamilyRevision wtxn = itr.next();
+ wtxnList.add(new FamilyRevision(wtxn.getRevision(), wtxn
+ .getTimestamp()));
+ }
+
+ return wtxnList;
+ }
+
+ /**
+ * This method returns the data associated with the path in zookeeper.
+ *
+ * @param path The znode path
+ * @param stat Zookeeper stat
+ * @return byte array The data stored in the znode.
+ * @throws IOException
+ */
+ byte[] getRawData(String path, Stat stat) throws IOException {
+ byte[] data = null;
+ try {
+ data = getSession().getData(path, false, stat);
+ } catch (Exception e) {
+ throw new IOException(
+ "Exception while obtaining raw data from zookeeper path "
+ + path, e);
+ }
+ return data;
+ }
+
+ /**
+ * This method created the basic znodes in zookeeper for revision
+ * management.
+ *
+ * @throws IOException
+ */
+ void createRootZNodes() throws IOException {
+ String txnBaseNode = PathUtil.getTransactionBasePath(this.baseDir);
+ String clockNode = PathUtil.getClockPath(this.baseDir);
+ ensurePathExists(txnBaseNode, null, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ ensurePathExists(clockNode, null, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ }
+
+ /**
+ * This method closes the zookeeper session.
+ */
+ void closeZKConnection() {
+ if (zkSession != null) {
+ try {
+ zkSession.close();
+ } catch (InterruptedException e) {
+ LOG.warn("Close failed: ", e);
+ }
+ zkSession = null;
+ LOG.info("Disconnected to ZooKeeper");
+ }
+ }
+
+ /**
+ * This method returns a zookeeper session. If the current session is closed,
+ * then a new session is created.
+ *
+ * @return ZooKeeper An instance of zookeeper client.
+ * @throws IOException
+ */
+ ZooKeeper getSession() throws IOException {
+ if (zkSession == null || zkSession.getState() == States.CLOSED) {
+ synchronized (this) {
+ if (zkSession == null || zkSession.getState() == States.CLOSED) {
+ zkSession = new ZooKeeper(this.connectString,
+ this.DEFAULT_SESSION_TIMEOUT, new ZKWatcher());
+ }
+ }
+ }
+ return zkSession;
+ }
+
+ /**
+ * This method updates the transaction data related to a znode.
+ *
+ * @param String The path to the transaction data.
+ * @param FamilyRevision The FamilyRevision to be updated.
+ * @param UpdateMode The mode to update like append, update, remove.
+ * @throws IOException
+ */
+ void updateData(String path, FamilyRevision updateTx, UpdateMode mode)
+ throws IOException {
+
+ if (updateTx == null) {
+ throw new IOException(
+ "The transaction to be updated found to be null.");
+ }
+ List currentData = getTransactionList(path);
+ List newData = new ArrayList();
+ boolean dataFound = false;
+ long updateVersion = updateTx.getRevision();
+ for (FamilyRevision tranx : currentData) {
+ if (tranx.getRevision() != updateVersion) {
+ newData.add(tranx);
+ } else {
+ dataFound = true;
+ }
+ }
+ switch (mode) {
+ case REMOVE:
+ if (dataFound == false) {
+ throw new IOException(
+ "The transaction to be removed not found in the data.");
+ }
+ LOG.info("Removed trasaction : " + updateTx.toString());
+ break;
+ case KEEP_ALIVE:
+ if (dataFound == false) {
+ throw new IOException(
+ "The transaction to be kept alove not found in the data. It might have been expired.");
+ }
+ newData.add(updateTx);
+ LOG.info("keep alive of transaction : " + updateTx.toString());
+ break;
+ case APPEND:
+ if (dataFound == true) {
+ throw new IOException(
+ "The data to be appended already exists.");
+ }
+ newData.add(updateTx);
+ LOG.info("Added transaction : " + updateTx.toString());
+ break;
+ }
+
+ // For serialization purposes.
+ List newTxnList = new ArrayList();
+ for (FamilyRevision wtxn : newData) {
+ StoreFamilyRevision newTxn = new StoreFamilyRevision(wtxn.getRevision(),
+ wtxn.getExpireTimestamp());
+ newTxnList.add(newTxn);
+ }
+ StoreFamilyRevisionList wtxnList = new StoreFamilyRevisionList(newTxnList);
+ byte[] newByteData = serialize(wtxnList);
+
+ Stat stat = null;
+ try {
+ stat = zkSession.setData(path, newByteData, -1);
+ } catch (KeeperException e) {
+ throw new IOException(
+ "Exception while updating trasactional data. ", e);
+ } catch (InterruptedException e) {
+ throw new IOException(
+ "Exception while updating trasactional data. ", e);
+ }
+
+ if (stat != null) {
+ LOG.info("Transaction list stored at " + path + ".");
+ }
+
+ }
+
+ /**
+ * Refresh transactions on a given transaction data path.
+ *
+ * @param path The path to the transaction data.
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ void refreshTransactions(String path) throws IOException{
+ List currentData = getTransactionList(path);
+ List newData = new ArrayList();
+
+ for (FamilyRevision tranx : currentData) {
+ if (tranx.getExpireTimestamp() > getTimeStamp()) {
+ newData.add(tranx);
+ }
+ }
+
+ if(newData.equals(currentData) == false){
+ List newTxnList = new ArrayList();
+ for (FamilyRevision wtxn : newData) {
+ StoreFamilyRevision newTxn = new StoreFamilyRevision(wtxn.getRevision(),
+ wtxn.getExpireTimestamp());
+ newTxnList.add(newTxn);
+ }
+ StoreFamilyRevisionList wtxnList = new StoreFamilyRevisionList(newTxnList);
+ byte[] newByteData = serialize(wtxnList);
+
+ try {
+ zkSession.setData(path, newByteData, -1);
+ } catch (KeeperException e) {
+ throw new IOException(
+ "Exception while updating trasactional data. ", e);
+ } catch (InterruptedException e) {
+ throw new IOException(
+ "Exception while updating trasactional data. ", e);
+ }
+
+ }
+
+ }
+
+ /**
+ * This method serializes a given instance of TBase object.
+ *
+ * @param obj An instance of TBase
+ * @return byte array The serialized data.
+ * @throws IOException
+ */
+ static byte[] serialize(TBase obj) throws IOException {
+ if (obj == null)
+ return new byte[0];
+ try {
+ TSerializer serializer = new TSerializer(
+ new TBinaryProtocol.Factory());
+ byte[] bytes = serializer.serialize(obj);
+ return bytes;
+ } catch (Exception e) {
+ throw new IOException("Serialization error: ", e);
+ }
+ }
+
+
+ /**
+ * This method deserializes the given byte array into the TBase object.
+ *
+ * @param obj An instance of TBase
+ * @param data Output of deserialization.
+ * @throws IOException
+ */
+ static void deserialize(TBase obj, byte[] data) throws IOException {
+ if (data == null || data.length == 0)
+ return;
+ try {
+ TDeserializer deserializer = new TDeserializer(
+ new TBinaryProtocol.Factory());
+ deserializer.deserialize(obj, data);
+ } catch (Exception e) {
+ throw new IOException("Deserialization error: " + e.getMessage(), e);
+ }
+ }
+
+ private class ZKWatcher implements Watcher {
+ public void process(WatchedEvent event) {
+ switch (event.getState()) {
+ case Expired:
+ LOG.info("The client session has expired. Try opening a new "
+ + "session and connecting again.");
+ zkSession = null;
+ break;
+ default:
+
+ }
+ }
+ }
+
+}
Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/LockListener.java
===================================================================
--- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/LockListener.java (revision 0)
+++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/LockListener.java (revision 0)
@@ -0,0 +1,39 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot.lock;
+
+/**
+ * This class has two methods which are call
+ * back methods when a lock is acquired and
+ * when the lock is released.
+ * This class has been used as-is from the zookeeper 3.3.4 recipes minor changes
+ * in the package name.
+ */
+public interface LockListener {
+ /**
+ * call back called when the lock
+ * is acquired
+ */
+ public void lockAcquired();
+
+ /**
+ * call back called when the lock is
+ * released.
+ */
+ public void lockReleased();
+}
\ No newline at end of file
Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ProtocolSupport.java
===================================================================
--- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ProtocolSupport.java (revision 0)
+++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ProtocolSupport.java (revision 0)
@@ -0,0 +1,193 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot.lock;
+
+import org.apache.hcatalog.hbase.snapshot.lock.ZooKeeperOperation;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A base class for protocol implementations which provides a number of higher
+ * level helper methods for working with ZooKeeper along with retrying synchronous
+ * operations if the connection to ZooKeeper closes such as
+ * {@link #retryOperation(ZooKeeperOperation)}
+ * This class has been used as-is from the zookeeper 3.4.0 recipes with
+ * changes in the retry delay, retry count values and package name.
+ */
+class ProtocolSupport {
+ private static final Logger LOG = Logger.getLogger(ProtocolSupport.class);
+
+ protected final ZooKeeper zookeeper;
+ private AtomicBoolean closed = new AtomicBoolean(false);
+ private long retryDelay = 500L;
+ private int retryCount = 3;
+ private List acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+ public ProtocolSupport(ZooKeeper zookeeper) {
+ this.zookeeper = zookeeper;
+ }
+
+ /**
+ * Closes this strategy and releases any ZooKeeper resources; but keeps the
+ * ZooKeeper instance open
+ */
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ doClose();
+ }
+ }
+
+ /**
+ * return zookeeper client instance
+ * @return zookeeper client instance
+ */
+ public ZooKeeper getZookeeper() {
+ return zookeeper;
+ }
+
+ /**
+ * return the acl its using
+ * @return the acl.
+ */
+ public List getAcl() {
+ return acl;
+ }
+
+ /**
+ * set the acl
+ * @param acl the acl to set to
+ */
+ public void setAcl(List acl) {
+ this.acl = acl;
+ }
+
+ /**
+ * get the retry delay in milliseconds
+ * @return the retry delay
+ */
+ public long getRetryDelay() {
+ return retryDelay;
+ }
+
+ /**
+ * Sets the time waited between retry delays
+ * @param retryDelay the retry delay
+ */
+ public void setRetryDelay(long retryDelay) {
+ this.retryDelay = retryDelay;
+ }
+
+ /**
+ * Allow derived classes to perform
+ * some custom closing operations to release resources
+ */
+ protected void doClose() {
+ }
+
+
+ /**
+ * Perform the given operation, retrying if the connection fails
+ * @return object. it needs to be cast to the callee's expected
+ * return type.
+ */
+ protected Object retryOperation(ZooKeeperOperation operation)
+ throws KeeperException, InterruptedException {
+ KeeperException exception = null;
+ for (int i = 0; i < retryCount; i++) {
+ try {
+ return operation.execute();
+ } catch (KeeperException.SessionExpiredException e) {
+ LOG.warn("Session expired for: " + zookeeper + " so reconnecting due to: " + e, e);
+ throw e;
+ } catch (KeeperException.ConnectionLossException e) {
+ if (exception == null) {
+ exception = e;
+ }
+ LOG.debug("Attempt " + i + " failed with connection loss so " +
+ "attempting to reconnect: " + e, e);
+ retryDelay(i);
+ }
+ }
+ throw exception;
+ }
+
+ /**
+ * Ensures that the given path exists with no data, the current
+ * ACL and no flags
+ * @param path
+ */
+ protected void ensurePathExists(String path) {
+ ensureExists(path, null, acl, CreateMode.PERSISTENT);
+ }
+
+ /**
+ * Ensures that the given path exists with the given data, ACL and flags
+ * @param path
+ * @param acl
+ * @param flags
+ */
+ protected void ensureExists(final String path, final byte[] data,
+ final List acl, final CreateMode flags) {
+ try {
+ retryOperation(new ZooKeeperOperation() {
+ public boolean execute() throws KeeperException, InterruptedException {
+ Stat stat = zookeeper.exists(path, false);
+ if (stat != null) {
+ return true;
+ }
+ zookeeper.create(path, data, acl, flags);
+ return true;
+ }
+ });
+ } catch (KeeperException e) {
+ LOG.warn("Caught: " + e, e);
+ } catch (InterruptedException e) {
+ LOG.warn("Caught: " + e, e);
+ }
+ }
+
+ /**
+ * Returns true if this protocol has been closed
+ * @return true if this protocol is closed
+ */
+ protected boolean isClosed() {
+ return closed.get();
+ }
+
+ /**
+ * Performs a retry delay if this is not the first attempt
+ * @param attemptCount the number of the attempts performed so far
+ */
+ protected void retryDelay(int attemptCount) {
+ if (attemptCount > 0) {
+ try {
+ Thread.sleep(attemptCount * retryDelay);
+ } catch (InterruptedException e) {
+ LOG.debug("Failed to sleep: " + e, e);
+ }
+ }
+ }
+}
Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/WriteLock.java
===================================================================
--- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/WriteLock.java (revision 0)
+++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/WriteLock.java (revision 0)
@@ -0,0 +1,297 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot.lock;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * A protocol to implement an exclusive
+ * write lock or to elect a leader. You invoke {@link #lock()} to
+ * start the process of grabbing the lock; you may get the lock then or it may be
+ * some time later. You can register a listener so that you are invoked
+ * when you get the lock; otherwise you can ask if you have the lock
+ * by calling {@link #isOwner()}
+ * This class has been used as-is from the zookeeper 3.4.0 recipes. The only change
+ * made is a TODO for sorting using suffixes and the package name.
+ */
+public class WriteLock extends ProtocolSupport {
+ private static final Logger LOG = Logger.getLogger(WriteLock.class);
+
+ private final String dir;
+ private String id;
+ private ZNodeName idName;
+ private String ownerId;
+ private String lastChildId;
+ private byte[] data = {0x12, 0x34};
+ private LockListener callback;
+ private LockZooKeeperOperation zop;
+
+ /**
+ * zookeeper contructor for writelock
+ * @param zookeeper zookeeper client instance
+ * @param dir the parent path you want to use for locking
+ * @param acls the acls that you want to use for all the paths,
+ * if null world read/write is used.
+ */
+ public WriteLock(ZooKeeper zookeeper, String dir, List acl) {
+ super(zookeeper);
+ this.dir = dir;
+ if (acl != null) {
+ setAcl(acl);
+ }
+ this.zop = new LockZooKeeperOperation();
+ }
+
+ /**
+ * zookeeper contructor for writelock with callback
+ * @param zookeeper the zookeeper client instance
+ * @param dir the parent path you want to use for locking
+ * @param acl the acls that you want to use for all the paths
+ * @param callback the call back instance
+ */
+ public WriteLock(ZooKeeper zookeeper, String dir, List acl,
+ LockListener callback) {
+ this(zookeeper, dir, acl);
+ this.callback = callback;
+ }
+
+ /**
+ * return the current locklistener
+ * @return the locklistener
+ */
+ public LockListener getLockListener() {
+ return this.callback;
+ }
+
+ /**
+ * register a different call back listener
+ * @param callback the call back instance
+ */
+ public void setLockListener(LockListener callback) {
+ this.callback = callback;
+ }
+
+ /**
+ * Removes the lock or associated znode if
+ * you no longer require the lock. this also
+ * removes your request in the queue for locking
+ * in case you do not already hold the lock.
+ * @throws RuntimeException throws a runtime exception
+ * if it cannot connect to zookeeper.
+ */
+ public synchronized void unlock() throws RuntimeException {
+
+ if (!isClosed() && id != null) {
+ // we don't need to retry this operation in the case of failure
+ // as ZK will remove ephemeral files and we don't wanna hang
+ // this process when closing if we cannot reconnect to ZK
+ try {
+
+ ZooKeeperOperation zopdel = new ZooKeeperOperation() {
+ public boolean execute() throws KeeperException,
+ InterruptedException {
+ zookeeper.delete(id, -1);
+ return Boolean.TRUE;
+ }
+ };
+ zopdel.execute();
+ } catch (InterruptedException e) {
+ LOG.warn("Caught: " + e, e);
+ //set that we have been interrupted.
+ Thread.currentThread().interrupt();
+ } catch (KeeperException.NoNodeException e) {
+ // do nothing
+ } catch (KeeperException e) {
+ LOG.warn("Caught: " + e, e);
+ throw (RuntimeException) new RuntimeException(e.getMessage()).
+ initCause(e);
+ }
+ finally {
+ if (callback != null) {
+ callback.lockReleased();
+ }
+ id = null;
+ }
+ }
+ }
+
+ /**
+ * the watcher called on
+ * getting watch while watching
+ * my predecessor
+ */
+ private class LockWatcher implements Watcher {
+ public void process(WatchedEvent event) {
+ // lets either become the leader or watch the new/updated node
+ LOG.debug("Watcher fired on path: " + event.getPath() + " state: " +
+ event.getState() + " type " + event.getType());
+ try {
+ lock();
+ } catch (Exception e) {
+ LOG.warn("Failed to acquire lock: " + e, e);
+ }
+ }
+ }
+
+ /**
+ * a zoookeeper operation that is mainly responsible
+ * for all the magic required for locking.
+ */
+ private class LockZooKeeperOperation implements ZooKeeperOperation {
+
+ /** find if we have been created earler if not create our node
+ *
+ * @param prefix the prefix node
+ * @param zookeeper teh zookeeper client
+ * @param dir the dir paretn
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir)
+ throws KeeperException, InterruptedException {
+ List names = zookeeper.getChildren(dir, false);
+ for (String name : names) {
+ if (name.startsWith(prefix)) {
+ id = name;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Found id created last time: " + id);
+ }
+ break;
+ }
+ }
+ if (id == null) {
+ id = zookeeper.create(dir + "/" + prefix, data,
+ getAcl(), EPHEMERAL_SEQUENTIAL);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created id: " + id);
+ }
+ }
+
+ }
+
+ /**
+ * the command that is run and retried for actually
+ * obtaining the lock
+ * @return if the command was successful or not
+ */
+ public boolean execute() throws KeeperException, InterruptedException {
+ do {
+ if (id == null) {
+ long sessionId = zookeeper.getSessionId();
+ String prefix = "x-" + sessionId + "-";
+ // lets try look up the current ID if we failed
+ // in the middle of creating the znode
+ findPrefixInChildren(prefix, zookeeper, dir);
+ idName = new ZNodeName(id);
+ }
+ if (id != null) {
+ List names = zookeeper.getChildren(dir, false);
+ if (names.isEmpty()) {
+ LOG.warn("No children in: " + dir + " when we've just " +
+ "created one! Lets recreate it...");
+ // lets force the recreation of the id
+ id = null;
+ } else {
+ // lets sort them explicitly (though they do seem to come back in order ususally :)
+ SortedSet sortedNames = new TreeSet();
+ for (String name : names) {
+ //TODO: Just use the suffix to sort.
+ sortedNames.add(new ZNodeName(dir + "/" + name));
+ }
+ ownerId = sortedNames.first().getName();
+ SortedSet lessThanMe = sortedNames.headSet(idName);
+ if (!lessThanMe.isEmpty()) {
+ ZNodeName lastChildName = lessThanMe.last();
+ lastChildId = lastChildName.getName();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("watching less than me node: " + lastChildId);
+ }
+ Stat stat = zookeeper.exists(lastChildId, new LockWatcher());
+ if (stat != null) {
+ return Boolean.FALSE;
+ } else {
+ LOG.warn("Could not find the" +
+ " stats for less than me: " + lastChildName.getName());
+ }
+ } else {
+ if (isOwner()) {
+ if (callback != null) {
+ callback.lockAcquired();
+ }
+ return Boolean.TRUE;
+ }
+ }
+ }
+ }
+ }
+ while (id == null);
+ return Boolean.FALSE;
+ }
+ };
+
+ /**
+ * Attempts to acquire the exclusive write lock returning whether or not it was
+ * acquired. Note that the exclusive lock may be acquired some time later after
+ * this method has been invoked due to the current lock owner going away.
+ */
+ public synchronized boolean lock() throws KeeperException, InterruptedException {
+ if (isClosed()) {
+ return false;
+ }
+ ensurePathExists(dir);
+
+ return (Boolean) retryOperation(zop);
+ }
+
+ /**
+ * return the parent dir for lock
+ * @return the parent dir used for locks.
+ */
+ public String getDir() {
+ return dir;
+ }
+
+ /**
+ * Returns true if this node is the owner of the
+ * lock (or the leader)
+ */
+ public boolean isOwner() {
+ return id != null && ownerId != null && id.equals(ownerId);
+ }
+
+ /**
+ * return the id for this lock
+ * @return the id for this lock
+ */
+ public String getId() {
+ return this.id;
+ }
+}
+
Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZNodeName.java
===================================================================
--- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZNodeName.java (revision 0)
+++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZNodeName.java (revision 0)
@@ -0,0 +1,110 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot.lock;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Represents an ephemeral znode name which has an ordered sequence number
+ * and can be sorted in order
+ * This class has been used as-is from the zookeeper 3.4.0 recipes with a
+ * change in package name.
+ */
+public class ZNodeName implements Comparable {
+ private final String name;
+ private String prefix;
+ private int sequence = -1;
+ private static final Logger LOG = Logger.getLogger(ZNodeName.class);
+
+ public ZNodeName(String name) {
+ if (name == null) {
+ throw new NullPointerException("id cannot be null");
+ }
+ this.name = name;
+ this.prefix = name;
+ int idx = name.lastIndexOf('-');
+ if (idx >= 0) {
+ this.prefix = name.substring(0, idx);
+ try {
+ this.sequence = Integer.parseInt(name.substring(idx + 1));
+ // If an exception occurred we misdetected a sequence suffix,
+ // so return -1.
+ } catch (NumberFormatException e) {
+ LOG.info("Number format exception for " + idx, e);
+ } catch (ArrayIndexOutOfBoundsException e) {
+ LOG.info("Array out of bounds for " + idx, e);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return name.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ZNodeName sequence = (ZNodeName) o;
+
+ if (!name.equals(sequence.name)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return name.hashCode() + 37;
+ }
+
+ public int compareTo(ZNodeName that) {
+ int answer = this.prefix.compareTo(that.prefix);
+ if (answer == 0) {
+ int s1 = this.sequence;
+ int s2 = that.sequence;
+ if (s1 == -1 && s2 == -1) {
+ return this.name.compareTo(that.name);
+ }
+ answer = s1 == -1 ? 1 : s2 == -1 ? -1 : s1 - s2;
+ }
+ return answer;
+ }
+
+ /**
+ * Returns the name of the znode
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Returns the sequence number
+ */
+ public int getZNodeName() {
+ return sequence;
+ }
+
+ /**
+ * Returns the text prefix before the sequence number
+ */
+ public String getPrefix() {
+ return prefix;
+ }
+}
Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZooKeeperOperation.java
===================================================================
--- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZooKeeperOperation.java (revision 0)
+++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZooKeeperOperation.java (revision 0)
@@ -0,0 +1,39 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot.lock;
+
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * A callback object which can be used for implementing retry-able operations in the
+ * {@link org.apache.hcatalog.hbase.snapshot.lock.ProtocolSupport} class
+ * This class has been used as-is from the zookeeper 3.4.0 with change in the
+ * package name .
+ */
+public interface ZooKeeperOperation {
+
+ /**
+ * Performs the operation - which may be involved multiple times if the connection
+ * to ZooKeeper closes during this operation
+ *
+ * @return the result of the operation or null
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public boolean execute() throws KeeperException, InterruptedException;
+}
Index: storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/WriteLockTest.java
===================================================================
--- storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/WriteLockTest.java (revision 0)
+++ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/WriteLockTest.java (revision 0)
@@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot.lock;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.test.ClientBase;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * test for writelock
+ * This class is taken from the zookeeper 3.4.0 as-is to test the zookeeper lock
+ * Recipe with a change in the package name.
+ */
+public class WriteLockTest extends ClientBase {
+ protected int sessionTimeout = 10 * 1000;
+ protected String dir = "/" + getClass().getName();
+ protected WriteLock[] nodes;
+ protected CountDownLatch latch = new CountDownLatch(1);
+ private boolean restartServer = true;
+ private boolean workAroundClosingLastZNodeFails = true;
+ private boolean killLeader = true;
+
+ @Test
+ public void testRun() throws Exception {
+ runTest(3);
+ }
+
+ class LockCallback implements LockListener {
+ public void lockAcquired() {
+ latch.countDown();
+ }
+
+ public void lockReleased() {
+
+ }
+
+ }
+ protected void runTest(int count) throws Exception {
+ nodes = new WriteLock[count];
+ for (int i = 0; i < count; i++) {
+ ZooKeeper keeper = createClient();
+ WriteLock leader = new WriteLock(keeper, dir, null);
+ leader.setLockListener(new LockCallback());
+ nodes[i] = leader;
+
+ leader.lock();
+ }
+
+ // lets wait for any previous leaders to die and one of our new
+ // nodes to become the new leader
+ latch.await(30, TimeUnit.SECONDS);
+
+ WriteLock first = nodes[0];
+ dumpNodes(count);
+
+ // lets assert that the first election is the leader
+ Assert.assertTrue("The first znode should be the leader " + first.getId(), first.isOwner());
+
+ for (int i = 1; i < count; i++) {
+ WriteLock node = nodes[i];
+ Assert.assertFalse("Node should not be the leader " + node.getId(), node.isOwner());
+ }
+
+ if (count > 1) {
+ if (killLeader) {
+ System.out.println("Now killing the leader");
+ // now lets kill the leader
+ latch = new CountDownLatch(1);
+ first.unlock();
+ latch.await(30, TimeUnit.SECONDS);
+ //Thread.sleep(10000);
+ WriteLock second = nodes[1];
+ dumpNodes(count);
+ // lets assert that the first election is the leader
+ Assert.assertTrue("The second znode should be the leader " + second.getId(), second.isOwner());
+
+ for (int i = 2; i < count; i++) {
+ WriteLock node = nodes[i];
+ Assert.assertFalse("Node should not be the leader " + node.getId(), node.isOwner());
+ }
+ }
+
+
+ if (restartServer) {
+ // now lets stop the server
+ System.out.println("Now stopping the server");
+ stopServer();
+ Thread.sleep(10000);
+
+ // TODO lets assert that we are no longer the leader
+ dumpNodes(count);
+
+ System.out.println("Starting the server");
+ startServer();
+ Thread.sleep(10000);
+
+ for (int i = 0; i < count - 1; i++) {
+ System.out.println("Calling acquire for node: " + i);
+ nodes[i].lock();
+ }
+ dumpNodes(count);
+ System.out.println("Now closing down...");
+ }
+ }
+ }
+
+ protected void dumpNodes(int count) {
+ for (int i = 0; i < count; i++) {
+ WriteLock node = nodes[i];
+ System.out.println("node: " + i + " id: " +
+ node.getId() + " is leader: " + node.isOwner());
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (nodes != null) {
+ for (int i = 0; i < nodes.length; i++) {
+ WriteLock node = nodes[i];
+ if (node != null) {
+ System.out.println("Closing node: " + i);
+ node.close();
+ if (workAroundClosingLastZNodeFails && i == nodes.length - 1) {
+ System.out.println("Not closing zookeeper: " + i + " due to bug!");
+ } else {
+ System.out.println("Closing zookeeper: " + i);
+ node.getZookeeper().close();
+ System.out.println("Closed zookeeper: " + i);
+ }
+ }
+ }
+ }
+ System.out.println("Now lets stop the server");
+ super.tearDown();
+
+ }
+}
Index: storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/ZNodeNameTest.java
===================================================================
--- storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/ZNodeNameTest.java (revision 0)
+++ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/ZNodeNameTest.java (revision 0)
@@ -0,0 +1,60 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot.lock;
+
+import junit.framework.TestCase;
+
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.junit.Test;
+
+/**
+ * test for znodenames. This class is taken as-is from zookeeper lock recipe test.
+ * The package name has been changed.
+ */
+public class ZNodeNameTest extends TestCase {
+ @Test
+ public void testOrderWithSamePrefix() throws Exception {
+ String[] names = { "x-3", "x-5", "x-11", "x-1" };
+ String[] expected = { "x-1", "x-3", "x-5", "x-11" };
+ assertOrderedNodeNames(names, expected);
+ }
+ @Test
+ public void testOrderWithDifferentPrefixes() throws Exception {
+ String[] names = { "r-3", "r-2", "r-1", "w-2", "w-1" };
+ String[] expected = { "r-1", "r-2", "r-3", "w-1", "w-2" };
+ assertOrderedNodeNames(names, expected);
+ }
+
+ protected void assertOrderedNodeNames(String[] names, String[] expected) {
+ int size = names.length;
+ assertEquals("The two arrays should be the same size!", names.length, expected.length);
+ SortedSet nodeNames = new TreeSet();
+ for (String name : names) {
+ nodeNames.add(new ZNodeName(name));
+ }
+
+ int index = 0;
+ for (ZNodeName nodeName : nodeNames) {
+ String name = nodeName.getName();
+ assertEquals("Node " + index, expected[index++], name);
+ }
+ }
+
+}
Index: storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/IDGenClient.java
===================================================================
--- storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/IDGenClient.java (revision 0)
+++ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/IDGenClient.java (revision 0)
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+public class IDGenClient extends Thread {
+
+ String connectionStr;
+ String base_dir;
+ ZKUtil zkutil;
+ Random sleepTime = new Random();
+ int runtime;
+ HashMap idMap;
+ String tableName;
+
+ IDGenClient(String connectionStr, String base_dir, int time, String tableName) {
+ super();
+ this.connectionStr = connectionStr;
+ this.base_dir = base_dir;
+ this.zkutil = new ZKUtil(connectionStr, base_dir);
+ this.runtime = time;
+ idMap = new HashMap();
+ this.tableName = tableName;
+ }
+
+ /*
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ long startTime = System.currentTimeMillis();
+ int timeElapsed = 0;
+ while( timeElapsed <= runtime){
+ try {
+ long id = zkutil.nextId(tableName);
+ idMap.put(System.currentTimeMillis(), id);
+
+ int sTime = sleepTime.nextInt(2);
+ Thread.sleep(sTime * 100);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ timeElapsed = (int) Math.ceil((System.currentTimeMillis() - startTime)/(double)1000);
+ }
+
+ }
+
+ Map getIdMap(){
+ return idMap;
+ }
+
+}
Index: storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestIDGenerator.java
===================================================================
--- storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestIDGenerator.java (revision 0)
+++ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestIDGenerator.java (revision 0)
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+
+import org.apache.hcatalog.hbase.SkeletonHBaseTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestIDGenerator extends SkeletonHBaseTest{
+
+ @Test
+ public void testIDGeneration() throws Exception {
+
+ int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181);
+ String servers = getHbaseConf().get("hbase.zookeeper.quorum");
+ String[] splits = servers.split(",");
+ StringBuffer sb = new StringBuffer();
+ for(String split : splits){
+ sb.append(split);
+ sb.append(':');
+ sb.append(port);
+ }
+ ZKUtil zkutil = new ZKUtil(sb.toString(), "/rm_base");
+
+ String tableName = "myTable";
+ long initId = zkutil.nextId(tableName);
+ for (int i=0; i<10; i++) {
+ long id = zkutil.nextId(tableName);
+ Assert.assertEquals(initId + (i + 1), id);
+ }
+ }
+
+ @Test
+ public void testMultipleClients() throws InterruptedException{
+
+ int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181);
+ String servers = getHbaseConf().get("hbase.zookeeper.quorum");
+ String[] splits = servers.split(",");
+ StringBuffer sb = new StringBuffer();
+ for(String split : splits){
+ sb.append(split);
+ sb.append(':');
+ sb.append(port);
+ }
+
+ ArrayList clients = new ArrayList();
+
+ for(int i =0; i < 5; i++){
+ IDGenClient idClient = new IDGenClient(sb.toString(), "/rm_base", 10, "testTable");
+ clients.add(idClient);
+ }
+
+ for(IDGenClient idClient : clients){
+ idClient.run();
+ }
+
+ for(IDGenClient idClient : clients){
+ idClient.join();
+ }
+
+ HashMap idMap = new HashMap();
+ for(IDGenClient idClient : clients){
+ idMap.putAll(idClient.getIdMap());
+ }
+
+ ArrayList keys = new ArrayList(idMap.keySet());
+ Collections.sort(keys);
+ int startId = 1;
+ for(Long key: keys){
+ Long id = idMap.get(key);
+ System.out.println("Key: " + key + " Value "+ id);
+ assertTrue(id == startId);
+ startId++;
+
+ }
+ }
+}
Index: storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java
===================================================================
--- storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java (revision 0)
+++ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java (revision 0)
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hcatalog.hbase.SkeletonHBaseTest;
+import org.apache.hcatalog.hbase.snapshot.transaction.thrift.*;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Test;
+
+public class TestRevisionManager extends SkeletonHBaseTest{
+
+ @Test
+ public void testBasicZNodeCreation() throws IOException, KeeperException, InterruptedException{
+
+ int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181);
+ String servers = getHbaseConf().get("hbase.zookeeper.quorum");
+ String[] splits = servers.split(",");
+ StringBuffer sb = new StringBuffer();
+ for(String split : splits){
+ sb.append(split);
+ sb.append(':');
+ sb.append(port);
+ }
+
+ ZKUtil zkutil = new ZKUtil(sb.toString(), "/rm_base");
+ String tableName = newTableName("testTable");
+ List columnFamilies = Arrays.asList("cf001", "cf002", "cf003");
+
+ zkutil.createRootZNodes();
+ ZooKeeper zk = zkutil.getSession();
+ Stat tempTwo = zk.exists("/rm_base" + PathUtil.DATA_DIR, false);
+ assertTrue(tempTwo != null);
+ Stat tempThree = zk.exists("/rm_base" + PathUtil.CLOCK_NODE, false);
+ assertTrue(tempThree != null);
+
+ zkutil.setUpZnodesForTable(tableName, columnFamilies);
+ String transactionDataTablePath = "/rm_base" + PathUtil.DATA_DIR + "/" + tableName;
+ Stat result = zk.exists(transactionDataTablePath, false);
+ assertTrue(result != null);
+
+ for(String colFamiliy : columnFamilies){
+ String cfPath = transactionDataTablePath + "/" + colFamiliy;
+ Stat resultTwo = zk.exists(cfPath, false);
+ assertTrue(resultTwo != null);
+ }
+
+ }
+
+ @Test
+ public void testCommitTransaction() throws IOException{
+
+ int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181);
+ String servers = getHbaseConf().get("hbase.zookeeper.quorum");
+ String[] splits = servers.split(",");
+ StringBuffer sb = new StringBuffer();
+ for(String split : splits){
+ sb.append(split);
+ sb.append(':');
+ sb.append(port);
+ }
+
+ Properties props = new Properties();
+ props.put(ZKBasedRevisionManager.HOSTLIST, sb.toString());
+ props.put(ZKBasedRevisionManager.DATADIR, "/rm_base");
+ ZKBasedRevisionManager manager = new ZKBasedRevisionManager();
+ manager.initialize(props);
+ manager.open();
+ ZKUtil zkutil = new ZKUtil(sb.toString(), "/rm_base");
+
+ String tableName = newTableName("testTable");
+ List columnFamilies = Arrays.asList("cf1", "cf2", "cf3");
+ Transaction txn = manager.beginWriteTransaction(tableName,
+ columnFamilies);
+
+ List cfs = zkutil.getColumnFamiliesOfTable(tableName);
+ assertTrue(cfs.size() == columnFamilies.size());
+ for (String cf : cfs){
+ assertTrue(columnFamilies.contains(cf));
+ }
+
+ for(String colFamily : columnFamilies){
+ String path = PathUtil.getRunningTxnInfoPath("/rm_base", tableName, colFamily);
+ byte[] data = zkutil.getRawData(path, null);
+ StoreFamilyRevisionList list = new StoreFamilyRevisionList();
+ ZKUtil.deserialize(list, data);
+ assertEquals(list.getRevisionListSize(), 1);
+ StoreFamilyRevision lightTxn = list.getRevisionList().get(0);
+ assertEquals(lightTxn.timestamp, txn.getTransactionExpireTimeStamp());
+ assertEquals(lightTxn.revision, txn.getRevisionNumber());
+
+ }
+ manager.commitWriteTransaction(txn);
+ for(String colFamiliy : columnFamilies){
+ String path = PathUtil.getRunningTxnInfoPath("/rm_base", tableName, colFamiliy);
+ byte[] data = zkutil.getRawData(path, null);
+ StoreFamilyRevisionList list = new StoreFamilyRevisionList();
+ ZKUtil.deserialize(list, data);
+ assertEquals(list.getRevisionListSize(), 0);
+
+ }
+
+ manager.close();
+ }
+
+ @Test
+ public void testAbortTransaction() throws IOException{
+
+ int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181);
+ String host = getHbaseConf().get("hbase.zookeeper.quorum");
+ Properties props = new Properties();
+ props.put(ZKBasedRevisionManager.HOSTLIST, host + ':' + port);
+ props.put(ZKBasedRevisionManager.DATADIR, "/rm_base");
+ ZKBasedRevisionManager manager = new ZKBasedRevisionManager();
+ manager.initialize(props);
+ manager.open();
+ ZKUtil zkutil = new ZKUtil(host + ':' + port, "/rm_base");
+
+ String tableName = newTableName("testTable");
+ List columnFamilies = Arrays.asList("cf1", "cf2", "cf3");
+ Transaction txn = manager.beginWriteTransaction(tableName, columnFamilies);
+ List cfs = zkutil.getColumnFamiliesOfTable(tableName);
+
+ assertTrue(cfs.size() == columnFamilies.size());
+ for (String cf : cfs){
+ assertTrue(columnFamilies.contains(cf));
+ }
+
+ for(String colFamiliy : columnFamilies){
+ String path = PathUtil.getRunningTxnInfoPath("/rm_base",tableName, colFamiliy);
+ byte[] data = zkutil.getRawData(path, null);
+ StoreFamilyRevisionList list = new StoreFamilyRevisionList();
+ ZKUtil.deserialize(list, data);
+ assertEquals(list.getRevisionListSize(), 1);
+ StoreFamilyRevision lightTxn = list.getRevisionList().get(0);
+ assertEquals(lightTxn.timestamp, txn.getTransactionExpireTimeStamp());
+ assertEquals(lightTxn.revision, txn.getRevisionNumber());
+
+ }
+ manager.abortWriteTransaction(txn);
+ for(String colFamiliy : columnFamilies){
+ String path = PathUtil.getRunningTxnInfoPath("/rm_base",tableName, colFamiliy);
+ byte[] data = zkutil.getRawData(path, null);
+ StoreFamilyRevisionList list = new StoreFamilyRevisionList();
+ ZKUtil.deserialize(list, data);
+ assertEquals(list.getRevisionListSize(), 0);
+
+ }
+
+ for(String colFamiliy : columnFamilies){
+ String path = PathUtil.getAbortInformationPath("/rm_base",tableName, colFamiliy);
+ byte[] data = zkutil.getRawData(path, null);
+ StoreFamilyRevisionList list = new StoreFamilyRevisionList();
+ ZKUtil.deserialize(list, data);
+ assertEquals(list.getRevisionListSize(), 1);
+ StoreFamilyRevision abortedTxn = list.getRevisionList().get(0);
+ assertEquals(abortedTxn.getRevision(), txn.getRevisionNumber());
+ }
+ manager.close();
+ }
+
+ @Test
+ public void testKeepAliveTransaction() throws InterruptedException, IOException {
+
+ int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181);
+ String servers = getHbaseConf().get("hbase.zookeeper.quorum");
+ String[] splits = servers.split(",");
+ StringBuffer sb = new StringBuffer();
+ for(String split : splits){
+ sb.append(split);
+ sb.append(':');
+ sb.append(port);
+ }
+
+ Properties props = new Properties();
+ props.put(ZKBasedRevisionManager.HOSTLIST, sb.toString());
+ props.put(ZKBasedRevisionManager.DATADIR, "/rm_base");
+ ZKBasedRevisionManager manager = new ZKBasedRevisionManager();
+ manager.initialize(props);
+ manager.open();
+ String tableName = newTableName("testTable");
+ List columnFamilies = Arrays.asList("cf1", "cf2");
+ Transaction txn = manager.beginWriteTransaction(tableName,
+ columnFamilies, 40);
+ Thread.sleep(100);
+ try {
+ manager.commitWriteTransaction(txn);
+ } catch (Exception e) {
+ assertTrue(e instanceof IOException);
+ assertEquals(e.getMessage(),
+ "The transaction to be removed not found in the data.");
+ }
+
+ }
+
+ @Test
+ public void testCreateSnapshot() throws IOException{
+ int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181);
+ String host = getHbaseConf().get("hbase.zookeeper.quorum");
+ Properties props = new Properties();
+ props.put(ZKBasedRevisionManager.HOSTLIST, host + ':' + port);
+ props.put(ZKBasedRevisionManager.DATADIR, "/rm_base");
+ ZKBasedRevisionManager manager = new ZKBasedRevisionManager();
+ manager.initialize(props);
+ manager.open();
+ String tableName = newTableName("testTable");
+ List cfOne = Arrays.asList("cf1", "cf2");
+ List cfTwo = Arrays.asList("cf2", "cf3");
+ Transaction tsx1 = manager.beginWriteTransaction(tableName, cfOne);
+ Transaction tsx2 = manager.beginWriteTransaction(tableName, cfTwo);
+ TableSnapshot snapshotOne = manager.createSnapshot(tableName);
+ assertEquals(snapshotOne.getRevision("cf1"), 0);
+ assertEquals(snapshotOne.getRevision("cf2"), 0);
+ assertEquals(snapshotOne.getRevision("cf3"), 1);
+
+ List cfThree = Arrays.asList("cf1", "cf3");
+ Transaction tsx3 = manager.beginWriteTransaction(tableName, cfThree);
+ manager.commitWriteTransaction(tsx1);
+ TableSnapshot snapshotTwo = manager.createSnapshot(tableName);
+ assertEquals(snapshotTwo.getRevision("cf1"), 2);
+ assertEquals(snapshotTwo.getRevision("cf2"), 1);
+ assertEquals(snapshotTwo.getRevision("cf3"), 1);
+
+ manager.commitWriteTransaction(tsx2);
+ TableSnapshot snapshotThree = manager.createSnapshot(tableName);
+ assertEquals(snapshotThree.getRevision("cf1"), 2);
+ assertEquals(snapshotThree.getRevision("cf2"), 3);
+ assertEquals(snapshotThree.getRevision("cf3"), 2);
+ manager.commitWriteTransaction(tsx3);
+ TableSnapshot snapshotFour = manager.createSnapshot(tableName);
+ assertEquals(snapshotFour.getRevision("cf1"), 3);
+ assertEquals(snapshotFour.getRevision("cf2"), 3);
+ assertEquals(snapshotFour.getRevision("cf3"), 3);
+
+ }
+
+
+}
Index: storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestThriftSerialization.java
===================================================================
--- storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestThriftSerialization.java (revision 0)
+++ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestThriftSerialization.java (revision 0)
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.hcatalog.hbase.snapshot.transaction.thrift.*;
+import org.junit.Test;
+
+public class TestThriftSerialization {
+
+ @Test
+ public void testLightWeightTransaction(){
+ StoreFamilyRevision trxn = new StoreFamilyRevision(0, 1000);
+ try {
+
+ byte[] data = ZKUtil.serialize(trxn);
+ StoreFamilyRevision newWtx = new StoreFamilyRevision();
+ ZKUtil.deserialize(newWtx, data);
+
+ assertTrue(newWtx.getRevision() == trxn.getRevision());
+ assertTrue(newWtx.getTimestamp() == trxn.getTimestamp());
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testWriteTransactionList(){
+ List txnList = new ArrayList();
+ long version;
+ long timestamp;
+ for( int i = 0; i < 10; i++){
+ version = i;
+ timestamp = 1000 + i;
+ StoreFamilyRevision wtx = new StoreFamilyRevision(version, timestamp);
+ txnList.add(wtx);
+ }
+
+ StoreFamilyRevisionList wList = new StoreFamilyRevisionList(txnList);
+
+ try {
+ byte[] data = ZKUtil.serialize(wList);
+ StoreFamilyRevisionList newList = new StoreFamilyRevisionList();
+ ZKUtil.deserialize(newList, data);
+ assertTrue(newList.getRevisionListSize() == wList.getRevisionListSize());
+
+ Iterator itr = newList.getRevisionListIterator();
+ int i = 0;
+ while(itr.hasNext()){
+ StoreFamilyRevision txn = itr.next();
+ assertTrue(txn.getRevision() == i);
+ assertTrue(txn.getTimestamp() == (i + 1000));
+ i++;
+ }
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+}