diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 44d9a57..ef8b126 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -732,6 +732,7 @@
HIVE_STATS_DESERIALIZATION_FACTOR("hive.stats.deserialization.factor", (float) 1.0),
// Concurrency
+ //HIVE-6207
HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", false),
HIVE_LOCK_MANAGER("hive.lock.manager", "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager"),
HIVE_LOCK_NUMRETRIES("hive.lock.numretries", 100),
@@ -748,6 +749,8 @@
// Transactions
HIVE_TXN_MANAGER("hive.txn.manager",
"org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"),
+ //HIVE-6207
+// "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"),
// time after which transactions are declared aborted if the client has
// not sent a heartbeat, in seconds.
HIVE_TXN_TIMEOUT("hive.txn.timeout", 300),
diff --git hcatalog/core/pom.xml hcatalog/core/pom.xml
index b5e85cd..9a81e8c 100644
--- hcatalog/core/pom.xml
+++ hcatalog/core/pom.xml
@@ -71,6 +71,12 @@
jackson-mapper-asl
${jackson.version}
+
+ org.mockito
+ mockito-all
+ ${mockito-all.version}
+ test
+
diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatReader.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatReader.java
index f41336b..36c7e2e 100644
--- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatReader.java
+++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatReader.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
+import org.apache.hadoop.mapreduce.JobID;
import org.apache.hive.hcatalog.common.HCatException;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.data.transfer.state.StateProvider;
@@ -33,14 +34,29 @@
/**
* This abstract class is internal to HCatalog and abstracts away the notion of
* underlying system from which reads will be done.
+ *
+ * Note that since 0.14 Hive provides ACID semantics, thus HCatalog must interact with Hive's
+ * Transaction Manager and so implementations of HCatReader must be implemented to use
+ * {@link org.apache.hive.hcatalog.mapreduce.TransactionContext} as documented in this class.
+ * Furthermore, HCatReader created on the master must stay around for the duration of the operation.
+ * More specifically, it's the TransactionContext that needs to stay around so that the transaction
+ * doesn't timeout the transaction.
+ * "mapreduce.job.user.name" property must be set. (is this enough for TxManager to restore state?)
+ * Note that HCatReader and HCatWriter are never part of the same transactions.
+ * {@link org.apache.hive.hcatalog.data.transfer.ReaderContext} can be used to restore HCatReader
+ * on the master in case of master crash/restart. The restored instance can then be used to close
+ * the transaction which was started before the crash. Note that restart must happen quickly to
+ * make sure that the transaction does not timeout.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class HCatReader {
/**
- * This should be called at master node to obtain {@link ReaderContext} which
- * then should be serialized and sent to slave nodes.
+ * This should be called at master node to obtain {@link org.apache.hive.hcatalog.data.transfer.ReaderContext}
+ * which then should be serialized and sent to slave nodes. Implementations of this method
+ * must ensure to call {@link org.apache.hive.hcatalog.mapreduce.TransactionContext#onSetupJob(org.apache.hadoop.mapreduce.JobContext)}
+ * to acquire appropriate locks on resources being read.
*
* @return {@link ReaderContext}
* @throws HCatException
@@ -56,6 +72,13 @@
public abstract Iterator read() throws HCatException;
/**
+ * This must be called on master node once reading is complete. Implementations of this method
+ * must ensure to call {@link org.apache.hive.hcatalog.mapreduce.TransactionContext#onCommitJob(org.apache.hadoop.mapreduce.JobContext)}
+ * to release resources.
+ * @throws HCatException
+ */
+ public abstract void close(ReaderContext ctx) throws HCatException;
+ /**
* This constructor will be invoked by {@link DataTransferFactory} at master
* node. Don't use this constructor. Instead, use {@link DataTransferFactory}
*
@@ -80,6 +103,9 @@ protected HCatReader(final Configuration config, StateProvider sp) {
this.sp = sp;
}
+ protected HCatReader(final ReaderContext ctx) {
+ //todo: not finished
+ }
protected ReadEntity re; // This will be null at slaves.
protected Configuration conf;
protected ReaderContext info;
@@ -95,4 +121,12 @@ private HCatReader(final Map config) {
this.conf = conf;
}
+ /**
+ * TODO: move this to different package; probably move to Shim class
+ * and make the seeds static and then auto-increment
+ * @return
+ */
+ public static JobID createJobID() {
+ return new JobID(Long.toString((long)(Math.random() * Long.MAX_VALUE)), (int)(Math.random() * 10000));
+ }
}
diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatWriter.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatWriter.java
index da6ad5b..8709934 100644
--- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatWriter.java
+++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatWriter.java
@@ -34,6 +34,8 @@
* This abstraction is internal to HCatalog. This is to facilitate writing to
* HCatalog from external systems. Don't try to instantiate this directly.
* Instead, use {@link DataTransferFactory}
+ *
+ * @see org.apache.hive.hcatalog.data.transfer.HCatReader
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@@ -46,6 +48,9 @@
/**
* External system should invoke this method exactly once from a master node.
+ * Implementations of this method must ensure to call
+ * {@link org.apache.hive.hcatalog.mapreduce.TransactionContext#onSetupJob(org.apache.hadoop.mapreduce.JobContext)}
+ * to acquire appropriate locks on resources being read.
*
* @return {@link WriterContext} This should be serialized and sent to slave
* nodes to construct HCatWriter there.
@@ -65,7 +70,9 @@ public abstract void write(final Iterator recordItr)
/**
* This method should be called at master node. Primary purpose of this is to
- * do metadata commit.
+ * do metadata commit. Implementations of this method must ensure to call
+ * {@link org.apache.hive.hcatalog.mapreduce.TransactionContext#onCommitJob(org.apache.hadoop.mapreduce.JobContext)}
+ * to release resources.
*
* @throws {@link HCatException}
*/
@@ -73,9 +80,11 @@ public abstract void write(final Iterator recordItr)
/**
* This method should be called at master node. Primary purpose of this is to
- * do cleanups in case of failures.
+ * do cleanups in case of failures. Implementations of this method must ensure to call
+ * {@link org.apache.hive.hcatalog.mapreduce.TransactionContext#onAbortJob(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.mapreduce.JobStatus.State)}
+ * to release resources.
*
- * @throws {@link HCatException} *
+ * @throws {@link HCatException}
*/
public abstract void abort(final WriterContext context) throws HCatException;
diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/ReaderContext.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/ReaderContext.java
index edf3654..c501d0a 100644
--- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/ReaderContext.java
+++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/ReaderContext.java
@@ -19,6 +19,8 @@
package org.apache.hive.hcatalog.data.transfer;
+import org.apache.hadoop.mapreduce.JobID;
+
import java.io.Externalizable;
@@ -27,6 +29,8 @@
* to the slaves. The contents of the class are opaque to the client. This
* interface extends {@link java.io.Externalizable} so that implementing
* classes can be serialized using standard Java mechanisms.
+ *
+ *
*/
public interface ReaderContext extends Externalizable {
@@ -37,5 +41,5 @@
* @return number of splits
*/
public int numSplits();
-
+ public JobID getJobID();
}
diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/WriterContext.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/WriterContext.java
index bd47c35..96bfe12 100644
--- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/WriterContext.java
+++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/WriterContext.java
@@ -19,6 +19,8 @@
package org.apache.hive.hcatalog.data.transfer;
+import org.apache.hadoop.mapreduce.JobID;
+
import java.io.Externalizable;
/**
@@ -29,5 +31,5 @@
* make it available to slaves to prepare for writes.
*/
public interface WriterContext extends Externalizable {
-
+ public JobID getJobID();
}
diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java
index 8669ded..d23e311 100644
--- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java
+++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java
@@ -24,10 +24,12 @@
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -39,6 +41,7 @@
import org.apache.hive.hcatalog.data.transfer.ReaderContext;
import org.apache.hive.hcatalog.data.transfer.state.StateProvider;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hive.hcatalog.mapreduce.TransactionContext;
/**
* This reader reads via {@link HCatInputFormat}
@@ -47,6 +50,7 @@
public class HCatInputFormatReader extends HCatReader {
private InputSplit split;
+ private TransactionContext txnCtx = null;
public HCatInputFormatReader(ReaderContext context, int slaveNumber,
StateProvider sp) {
@@ -58,6 +62,16 @@ public HCatInputFormatReader(ReadEntity info, Map config) {
super(info, config);
}
+ /**
+ * this is meant to be used on recovery of Master node....
+ * not finished
+ * @param context
+ */
+ public HCatInputFormatReader(ReaderContext context) {
+ super(((ReaderContextImpl)context).getConf(), null);
+ info = context;
+ }
+
@Override
public ReaderContext prepareRead() throws HCatException {
try {
@@ -65,9 +79,12 @@ public ReaderContext prepareRead() throws HCatException {
HCatInputFormat hcif = HCatInputFormat.setInput(
job, re.getDbName(), re.getTableName(), re.getFilterString());
ReaderContextImpl cntxt = new ReaderContextImpl();
+ JobContext jc = ShimLoader.getHadoopShims().getHCatShim().createJobContext(job.getConfiguration(), cntxt.getJobID());
cntxt.setInputSplits(hcif.getSplits(
- ShimLoader.getHadoopShims().getHCatShim().createJobContext(job.getConfiguration(), null)));
+ jc));
cntxt.setConf(job.getConfiguration());
+ txnCtx = new TransactionContext(jc);
+ txnCtx.onSetupJob(jc);
return cntxt;
} catch (IOException e) {
throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
@@ -93,6 +110,28 @@ public ReaderContext prepareRead() throws HCatException {
return new HCatRecordItr(rr);
}
+ @Override
+ public void close(ReaderContext ctx) throws HCatException {
+ if(txnCtx == null) {
+ //so now what? Did the client die and we have retry?
+ //in this case we should restore TransactionContext
+ //Or user error and close is called before prepareRead()
+ //or worse, it's called on slave, in which case we really
+ //don't want to call TransactionContext
+ return;
+ }
+ try {
+ //what's the right way to create context?
+ txnCtx.onCommitJob(createJobContext(conf, ctx));
+ }
+ catch(IOException ex) {
+ throw new HCatException("WTF?");
+ }
+ }
+ private static JobContext createJobContext(Configuration conf, ReaderContext ctx) throws IOException {
+ return ShimLoader.getHadoopShims().getHCatShim().createJobContext(
+ new Job(conf).getConfiguration(), ctx.getJobID());
+ }
private static class HCatRecordItr implements Iterator {
private RecordReader curRecReader;
diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java
index cdbd829..67ff843 100644
--- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java
+++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java
@@ -35,6 +35,7 @@
import org.apache.hive.hcatalog.common.ErrorType;
import org.apache.hive.hcatalog.common.HCatException;
import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.transfer.HCatReader;
import org.apache.hive.hcatalog.data.transfer.HCatWriter;
import org.apache.hive.hcatalog.data.transfer.WriteEntity;
import org.apache.hive.hcatalog.data.transfer.WriterContext;
@@ -48,6 +49,7 @@
*/
public class HCatOutputFormatWriter extends HCatWriter {
+ private volatile OutputCommitter oc;
public HCatOutputFormatWriter(WriteEntity we, Map config) {
super(we, config);
}
@@ -56,25 +58,36 @@ public HCatOutputFormatWriter(WriterContext cntxt, StateProvider sp) {
super(((WriterContextImpl)cntxt).getConf(), sp);
}
+ public HCatOutputFormatWriter(WriterContext ctx) {
+ //this is meant to be called from DataTransferFactory when HCatWriter
+ //on Master is restored after a crash;
+ //todo: finish this
+ super(((WriterContextImpl)ctx).getConf(), null);
+ //it should recreate OutputCommitter and call setupJob() to
+ //restart heartbeat thread
+ }
@Override
public WriterContext prepareWrite() throws HCatException {
OutputJobInfo jobInfo = OutputJobInfo.create(we.getDbName(),
we.getTableName(), we.getPartitionKVs());
Job job;
+ WriterContextImpl cntxt = new WriterContextImpl();
try {
job = new Job(conf);
+ //make sure that OutputCommitter calls all use the same job id
+ job.setJobID(cntxt.getJobID());
HCatOutputFormat.setOutput(job, jobInfo);
HCatOutputFormat.setSchema(job, HCatOutputFormat.getTableSchema(job.getConfiguration()));
HCatOutputFormat outFormat = new HCatOutputFormat();
outFormat.checkOutputSpecs(job);
- outFormat.getOutputCommitter(ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext(
- job.getConfiguration(), ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptID())).setupJob(job);
+ oc = outFormat.getOutputCommitter(ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext(
+ job.getConfiguration(), ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptID()));
+ oc.setupJob(job);
} catch (IOException e) {
throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
} catch (InterruptedException e) {
throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
}
- WriterContextImpl cntxt = new WriterContextImpl();
cntxt.setConf(job.getConfiguration());
return cntxt;
}
@@ -126,16 +139,10 @@ public void write(Iterator recordItr) throws HCatException {
public void commit(WriterContext context) throws HCatException {
WriterContextImpl cntxtImpl = (WriterContextImpl)context;
try {
- new HCatOutputFormat().getOutputCommitter(
- ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext(
- cntxtImpl.getConf(),
- ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptID()))
- .commitJob(ShimLoader.getHadoopShims().getHCatShim().createJobContext(
- cntxtImpl.getConf(), null));
+ oc.commitJob(ShimLoader.getHadoopShims().getHCatShim().createJobContext(
+ cntxtImpl.getConf(), context.getJobID()));
} catch (IOException e) {
throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
- } catch (InterruptedException e) {
- throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
}
}
@@ -143,16 +150,10 @@ public void commit(WriterContext context) throws HCatException {
public void abort(WriterContext context) throws HCatException {
WriterContextImpl cntxtImpl = (WriterContextImpl)context;
try {
- new HCatOutputFormat().getOutputCommitter(
- ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext(
- cntxtImpl.getConf(),
- ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptID()))
- .abortJob(ShimLoader.getHadoopShims().getHCatShim().createJobContext(
- cntxtImpl.getConf(), null), State.FAILED);
+ oc.abortJob(ShimLoader.getHadoopShims().getHCatShim().createJobContext(
+ cntxtImpl.getConf(), context.getJobID()), State.FAILED);
} catch (IOException e) {
throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
- } catch (InterruptedException e) {
- throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
}
}
diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/ReaderContextImpl.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/ReaderContextImpl.java
index 89628a4..102076d 100644
--- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/ReaderContextImpl.java
+++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/ReaderContextImpl.java
@@ -28,11 +28,13 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hive.hcatalog.data.transfer.HCatReader;
import org.apache.hive.hcatalog.data.transfer.ReaderContext;
import org.apache.hive.hcatalog.mapreduce.HCatSplit;
/**
- * This class contains the list of {@link InputSplit}s obtained
+ * This class contains the list of {@link org.apache.hadoop.mapreduce.InputSplit}s obtained
* at master node and the configuration.
*/
class ReaderContextImpl implements ReaderContext, Configurable {
@@ -40,10 +42,12 @@
private static final long serialVersionUID = -2656468331739574367L;
private List splits;
private Configuration conf;
+ private final JobID jobID;
public ReaderContextImpl() {
this.splits = new ArrayList();
this.conf = new Configuration();
+ this.jobID = HCatReader.createJobID();
}
void setInputSplits(final List splits) {
@@ -60,6 +64,11 @@ public int numSplits() {
}
@Override
+ public JobID getJobID() {
+ return jobID;
+ }
+
+ @Override
public Configuration getConf() {
return conf;
}
@@ -76,6 +85,7 @@ public void writeExternal(ObjectOutput out) throws IOException {
for (InputSplit split : splits) {
((HCatSplit) split).write(out);
}
+ jobID.write(out);
}
@Override
@@ -88,5 +98,6 @@ public void readExternal(ObjectInput in) throws IOException,
split.readFields(in);
splits.add(split);
}
+ jobID.readFields(in);
}
}
diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/WriterContextImpl.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/WriterContextImpl.java
index 088d258..835a14e 100644
--- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/WriterContextImpl.java
+++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/WriterContextImpl.java
@@ -26,6 +26,8 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hive.hcatalog.data.transfer.HCatReader;
import org.apache.hive.hcatalog.data.transfer.WriterContext;
/**
@@ -38,9 +40,11 @@
private static final long serialVersionUID = -5899374262971611840L;
private Configuration conf;
+ private final JobID jobID;
public WriterContextImpl() {
conf = new Configuration();
+ jobID = HCatReader.createJobID();
}
@Override
@@ -56,11 +60,17 @@ public void setConf(final Configuration config) {
@Override
public void writeExternal(ObjectOutput out) throws IOException {
conf.write(out);
+ jobID.write(out);
}
@Override
public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
conf.readFields(in);
+ jobID.readFields(in);
+ }
+ @Override
+ public JobID getJobID() {
+ return jobID;
}
}
diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/package.html hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/package.html
new file mode 100644
index 0000000..b85bd8c
--- /dev/null
+++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/package.html
@@ -0,0 +1,18 @@
+
+
+
+
+
+
+
+
+HCatalog DataTransfer API
+
+
+
+This API provides a way to read/write Hive tables w/o creating any MapReduce jobs.
+For details see HCatReader/Writer
+
+
+
diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputCommitterContainer.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputCommitterContainer.java
index cead40d..6ac835e 100644
--- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputCommitterContainer.java
+++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputCommitterContainer.java
@@ -66,6 +66,7 @@ public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
@Override
public void setupJob(JobContext context) throws IOException {
getBaseOutputCommitter().setupJob(HCatMapRedUtil.createJobContext(context));
+ super.setupJob(context);
}
@Override
@@ -76,12 +77,14 @@ public void setupTask(TaskAttemptContext context) throws IOException {
@Override
public void abortJob(JobContext jobContext, State state) throws IOException {
getBaseOutputCommitter().abortJob(HCatMapRedUtil.createJobContext(jobContext), state);
+ super.abortJob(jobContext, state);
cleanupJob(jobContext);
}
@Override
public void commitJob(JobContext jobContext) throws IOException {
getBaseOutputCommitter().commitJob(HCatMapRedUtil.createJobContext(jobContext));
+ super.commitJob(jobContext);
cleanupJob(jobContext);
}
diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
index 491da14..f6fc6a3 100644
--- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
+++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
@@ -146,6 +146,7 @@ public void setupJob(JobContext context) throws IOException {
getBaseOutputCommitter().setupJob(HCatMapRedUtil.createJobContext(context));
}
// in dynamic usecase, called through FileRecordWriterContainer
+ super.setupJob(context);
}
@Override
@@ -199,6 +200,7 @@ public void abortJob(JobContext jobContext, State state) throws IOException {
if (!src.equals(tblPath)){
fs.delete(src, true);
}
+ super.abortJob(jobContext, state);
} finally {
cancelDelegationTokens(jobContext);
}
@@ -246,6 +248,7 @@ public void commitJob(JobContext jobContext) throws IOException {
}
}
}
+ super.commitJob(jobContext);
// Commit has succeeded (since no exceptions have been thrown.)
// Safe to cancel delegation tokens now.
diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
index 360e77b..bea15e0 100644
--- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
+++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
@@ -90,6 +90,10 @@ private InputJobInfo(String databaseName,
this.properties = properties == null ? new Properties() : properties;
}
+ public String getQualifiedTableName() {
+ return databaseName + "." + tableName;
+ }
+
/**
* Gets the value of databaseName
* @return the databaseName
@@ -183,4 +187,14 @@ private void readObject(ObjectInputStream ois)
new ObjectInputStream(new InflaterInputStream(ois));
partitions = (List)partInfoReader.readObject();
}
+
+
+ /**
+ * Built to enable testing with this class in other packages.
+ * @param partitions
+ */
+ public void setPartitionsForTesting(List partitions) {
+ this.partitions = partitions;
+ }
+
}
diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputCommitterContainer.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputCommitterContainer.java
index 2125e14..3154424 100644
--- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputCommitterContainer.java
+++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputCommitterContainer.java
@@ -20,21 +20,26 @@
package org.apache.hive.hcatalog.mapreduce;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.OutputCommitter;
+import java.io.IOException;
+
/**
* This class will contain an implementation of an OutputCommitter.
* See {@link OutputFormatContainer} for more information about containers.
*/
abstract class OutputCommitterContainer extends OutputCommitter {
private final org.apache.hadoop.mapred.OutputCommitter committer;
+ private final TransactionContext transactionContext;
/**
* @param context current JobContext
* @param committer OutputCommitter that this instance will contain
*/
- public OutputCommitterContainer(JobContext context, org.apache.hadoop.mapred.OutputCommitter committer) {
+ public OutputCommitterContainer(JobContext context, org.apache.hadoop.mapred.OutputCommitter committer) throws IOException {
this.committer = committer;
+ transactionContext = new TransactionContext(context);
}
/**
@@ -43,5 +48,16 @@ public OutputCommitterContainer(JobContext context, org.apache.hadoop.mapred.Out
public OutputCommitter getBaseOutputCommitter() {
return committer;
}
-
+ @Override
+ public void setupJob(JobContext context) throws IOException {
+ transactionContext.onSetupJob(context);
+ }
+ @Override
+ public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
+ transactionContext.onAbortJob(jobContext, state);
+ }
+ @Override
+ public void commitJob(JobContext jobContext) throws IOException {
+ transactionContext.onCommitJob(jobContext);
+ }
}
diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java
index d50f43b..7cdd633 100644
--- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java
+++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java
@@ -96,7 +96,9 @@ private OutputJobInfo(String databaseName,
this.partitionValues = partitionValues;
this.properties = new Properties();
}
-
+ public String getQualifiedTableName() {
+ return databaseName + "." + tableName;
+ }
/**
* @return the posOfPartCols
*/
diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/TransactionContext.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/TransactionContext.java
new file mode 100644
index 0000000..f37eb4c
--- /dev/null
+++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/TransactionContext.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.hcatalog.mapreduce;
+
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Heartbeater;
+import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
+import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.txn.HCatDbTxnManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * In order for MR jobs using HCatalog to support/play nice with Hive's ACID semantics they must
+ * interact with Hive Transaction Manager. This class implements the usual BEGIN TRANSACTION and
+ * (COMMIT|ABORT TRANSACTION) methods. It's the responsibility of the MR job to call these.
+ * In order to support fire and forget MR job submission, this class is designed to be invoked from
+ * the OutputCommitter of the job.
+ * Jobs using HCatOutputFormat will automatically make use of this. Jobs using HCatInputFormat and
+ * anything other than HCatOutputFormat must make sure that the OutputCommitter of the job calls
+ * A more detailed design note can be found in
+ * HIVE-6207.
+ * {@link #onSetupJob(org.apache.hadoop.mapreduce.JobContext)},
+ * {@link #onCommitJob(org.apache.hadoop.mapreduce.JobContext)} and
+ * {@link #onAbortJob(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.mapreduce.JobStatus.State)}
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class TransactionContext {
+ /*Some implementation notes:
+ For the happy path, we can expect a single instance of OutputCommitter on which all lifecycle methods
+ are called. This instance is called from ApplicationMaster. In case of container failures
+ (or other conditions that cause job retries), we may end up initiating a TX from one instance of
+ TransactionContext and committing/aborting from another.
+ (is this actually true? When a job is retied do all lifecycle methods get retired?)
+
+ Use this to determine that it's retried
+ return context.getConfiguration().getInt(
+ MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
+
+ * todo:
+ * So far this implements the "happy path". Need to do some work to properly handle the retry of the job.
+ *
+ * Current design requires that jobs using HCatInputFormat but not HCatOutputFormat are modified in
+ * order to work with Hive's Transaction Manager
+ * how do we know if this is retried? Can we modify Configuration?
+ * think about retry logic; where should acquired locks be stored.
+ * What about transaction id?
+ * todo: Heartbeat (use org.apache.hadoop.hive.ql.exec.Heartbeater)
+ * Dependencies
+ * 1. ensure that TxManager is re-entrant
+ * 2. ensure that TxManager is stateless, i.e. we can acquire locks in one instance and release in another.
+ * */
+ private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
+ private static final String ERR_MSG = "Transaction Manger error: ";
+ private final HCatDbTxnManager txnManager;
+ /**
+ * Use this prevent multiple making multiple close/commit/etc calls to
+ * TxnManager since some of the methods here may be called more than once.
+ */
+ private volatile boolean isTxClosed = false;
+ private final HiveConf hiveConf;
+ //this probably needs to be stored in Context as well
+ private boolean isReadOnlyTx = false;
+ /*should this be stored in Context?
+ * In case of retries, we are not likely to get the same instance of DefaultOutputCommitterContainer;
+ * In fact, retries may be due to container failure so even static variables won't work.*/
+ private List locks = null;
+
+ private final ScheduledExecutorService scheduler;
+ //are we guaranteed that commit/abort will be called on the same instance when there is
+ //no retry? Are we even guaranteed the same container? Presumably there is no point
+ //having > 1 container if there are no retries.
+ private final Heartbeater heartbeater;
+ public TransactionContext(final JobContext context) throws IOException {
+ hiveConf = HCatUtil.getHiveConf(context.getConfiguration());
+ //set 'nostrict' so that we can generate the SQL and QueryPlan to acquire locks
+ //we don't actually run the query, so if user's job is not configured properly
+ //wrt Hive config, it will fail at runtime
+ hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nostrict");
+ if(!hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY) ||
+ //the 2nd condition is there because if concurrency is enabled but DummyTxnManager is used,
+ //it tries to use Zookeeper which requires additional set up
+ !DbTxnManager.class.getName().equals(hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER))) {//site-wide property
+ txnManager = null;
+ heartbeater = null;
+ scheduler = null;
+ LOG.debug("TransactionContext(disabled) " + makeWaterMark(context));
+ return;
+ }
+ try {
+ txnManager = (HCatDbTxnManager)TxnManagerFactory.getTxnManagerFactory(true).getTxnManager(hiveConf);
+ txnManager.setJobid(context.getJobID().toString());
+ //this is crucial so that it can restore it's state if necessary
+ heartbeater = new Heartbeater(txnManager, context.getConfiguration(), context.getJobID().toString());
+ scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable runnable) {
+ Thread t = new Thread(runnable, "HCatTxHbTh" + context.getJobID());
+ t.setDaemon(true);
+ return t;
+ }
+ });
+ }
+ catch(LockException ex) {
+ throw new IOException(ERR_MSG + ex.getMessage(), ex);
+ }
+ LOG.debug("TransactionContext(enabled) " + makeWaterMark(context), new Exception("stack trace"));
+ }
+ /**
+ * Semantically, this is the BEGIN TRANSACTION method. This must be called from
+ * {@link org.apache.hadoop.mapreduce.OutputCommitter#setupJob(org.apache.hadoop.mapreduce.JobContext)},
+ * this will be called once per job attempt.
+ * @throws IOException
+ */
+ public void onSetupJob(final JobContext jobContext) throws IOException {
+ LOG.debug("onSetupJob() " + makeWaterMark(jobContext), new Exception("stack trace"));
+ //called once per job attempt (mapreduce API)
+ if(txnManager == null) {
+ return;//locking disabled
+ }
+ int attemptId = jobContext.getConfiguration().getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
+ if(attemptId > 0) {
+ /*so now we know it's being retried - strictly speaking we don't know anything else
+ so we don't know if locks were acquired or lock acquisition, for example, failed
+ Nor do we know if we've started the heartbeat thread. For example, could the
+ old instance be around and still beating? Too many heartbeats can overwhelm the DB.
+
+ so if we assume that retry only happens because container failed (i.e. JVM) there is no
+ issue as acquireLocks() is idempotent (at least that was the agreement with Alan)
+ Also, if context has no InputJobInfo/OutputJobInfo, acquire on TM does nothing -
+ we need this for HCatReader/Writer recovery
+
+ If retry could mean that some logical container failed, i.e. the object is still around
+ we don't want to start new heartbeat threads... On the other hand, it seems silly that
+ JT/AM would keep old instances around, so we can expect them to be GC'd thus finalize should kill the
+ HB thread.... (even though it's not guaranteed to be called)
+
+ You could also imagine that AM is doing something really stupid and creating multiple
+ instances of this class and calling different methods on different instances.
+
+ I suppose the worst would be if it creates an instance and calls setup. the nukes that instance of OC.
+ then creates another to call commit. This means we'd have to make scheduler static.
+ of course AM could always spawn a new JVM for each instance of OC in which case we'd be fucked
+
+
+ */
+ LOG.debug("attemptId=" + attemptId + makeWaterMark(jobContext));
+ }
+ if(!acquireLocks2(jobContext)) {
+ //no locks acquired, don't bother with heartbeat
+ return;
+ }
+ //use 'delay' rather than 'rate' since we don't know how long a beat may take and we don't
+ //want to make beat interval to high so that Heartbeater config controls the actual
+ //frequency of beats
+ scheduler.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ heartbeater.heartbeat();
+ } catch (IOException ex) {
+ //don't re-throw here - it will kill the scheduled task; hope that ping failure is transient
+ LOG.error("Failed to send heartbeat to Transaction Manager: " + ex.getMessage() +
+ " Transaction may timeout: " + makeWaterMark(jobContext), ex);
+ }
+ }
+ }, 10L, 5L, TimeUnit.SECONDS);
+ }
+
+ private boolean acquireLocks2(JobContext context) throws IOException {
+ String userName = context.getConfiguration().get("mapreduce.job.user.name");
+ String inputInfo = context.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO);
+ String outputInfo = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+ if(inputInfo == null && outputInfo == null) {
+ return false;
+ }
+ InputJobInfo inputJobInfo = null;
+ if(inputInfo != null) {
+ inputJobInfo = (InputJobInfo) HCatUtil.deserialize(inputInfo);
+ }
+ OutputJobInfo outputJobInfo = null;
+ if(outputInfo != null) {
+ outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(outputInfo);
+ }
+ isReadOnlyTx = inputJobInfo != null && outputJobInfo == null;
+ try {
+ if(!isReadOnlyTx) {
+ txnManager.openTxn(userName);
+ }
+ txnManager.acquireLocks(inputJobInfo, outputJobInfo, userName);
+ }
+ catch(LockException ex) {
+ wrapAndThrow(ex);
+ }
+ return true;
+ }
+ /**
+ * Semantically, this is the ABORT TRANSACTION method. This must be called from
+ * {@link org.apache.hadoop.mapreduce.OutputCommitter#abortJob(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.mapreduce.JobStatus.State)}.
+ * @throws IOException
+ */
+ public void onAbortJob(JobContext jobContext, JobStatus.State state) throws IOException {
+ LOG.debug("onAbortJob(state=" + state + ") " + makeWaterMark(jobContext));
+ //may be called multiple times
+ if(txnManager == null) {
+ return;//locking disabled
+ }
+ preCleanUp();
+ if(isReadOnlyTx) {
+ return;//HiveTxnManager#openTxn() is only called for write TX
+ }
+ try {
+ txnManager.rollbackTxn();
+ }
+ catch(LockException ex) {
+ wrapAndThrow(ex);
+ }
+ cleanup(jobContext);
+ }
+ /**
+ * Semantically, this is the COMMIT TRANSACTION method. This must be called from
+ * {@link org.apache.hadoop.mapreduce.OutputCommitter#commitJob(org.apache.hadoop.mapreduce.JobContext)},
+ * which is guaranteed to be called exactly once (if job is successful)
+ */
+ public void onCommitJob(JobContext jobContext) throws IOException {
+ LOG.debug("onCommitJob()" + makeWaterMark(jobContext));
+ //called at most once
+ if(txnManager == null) {
+ return;//locking disabled
+ }
+ preCleanUp();
+ if(isReadOnlyTx) {
+ return;//HiveTxnManager#openTxn() is only called for write TX
+ }
+ try {
+ txnManager.commitTxn();
+ }
+ catch(LockException ex) {
+ wrapAndThrow(ex);
+ }
+ cleanup(jobContext);
+ }
+ private static void wrapAndThrow(LockException ex) throws IOException {
+ throw new IOException(ERR_MSG + ex.getMessage(), ex);
+ }
+ private void addLocks(List newLocks) {
+ if(locks == null) {
+ locks = newLocks;
+ }
+ locks.addAll(newLocks);
+ }
+ /**
+ * stop heartbeat thread
+ */
+ private void preCleanUp() {
+ if(scheduler == null || scheduler.isTerminated()) {
+ return;
+ }
+ try {
+ scheduler.shutdown();
+ if (!scheduler.awaitTermination(10L, TimeUnit.SECONDS)) {
+ LOG.warn("Could not cancel heartbeat for 10 seconds. Proceeding to terminate.");
+ }
+ }
+ catch(InterruptedException ex) {
+ LOG.warn("Interrupted trying to shutdown heartbeat");
+ }
+ scheduler.shutdownNow();
+ }
+ private static boolean sanityCheck(JobContext context) {
+ //sanity check: if OutputJobInfo is present but the job is not using HCatOutputFormat (or
+ // a subclass)
+ //this must be an error; HCatOutputFormat only makes sense with OutputJobInfo...
+ try {
+ Class extends OutputFormat,?>> of = context.getOutputFormatClass();
+ if(of != null && HCatOutputFormat.class.isAssignableFrom(of)) {
+ LOG.warn("OutputFormat=" + HCatOutputFormat.class + " but no "
+ + OutputJobInfo.class.getName() + " found in " + HCatConstants.HCAT_KEY_OUTPUT_INFO);
+ }
+ return true;
+ }
+ catch(ClassNotFoundException ex) {
+ LOG.warn(StringUtils.stringifyException(ex));
+ }
+ return false;
+ }
+ private void cleanup(JobContext context) {
+ LOG.warn("cleanup()" + makeWaterMark(context));
+ if(locks != null && isReadOnlyTx) {
+ try {
+ txnManager.getLockManager().releaseLocks(locks);
+ }
+ catch(LockException ex) {
+ LOG.warn(StringUtils.stringifyException(ex));
+ }
+ }
+ //is this OK or are we leaking resources when there are retries, etc?
+ txnManager.closeTxnManager();
+ isTxClosed = true;
+ }
+ private String makeWaterMark(JobContext context) {
+ return "JobID=" + context.getJobID() + " this=" + System.identityHashCode(this) + " thread=" + Thread.currentThread().getId();
+ }
+
+ /**
+ * MR framework creates multiple instance of this object (even when there are no retires of the job),
+ * but doesn't necessarily call call all/any methods on each instance. This is an attempt not to
+ * leak TxnManager resources.
+ * @throws Throwable
+ */
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ if(scheduler != null && !scheduler.isTerminated()) {
+ scheduler.shutdownNow();
+ }
+ if(txnManager != null && !isTxClosed) {
+ //don't do this - it will kill the Tx, so if job is retried and an old instance of OutputCommitter gets
+ //GC'd, it may kill TX prematurely.
+ //txnManager.closeTxnManager();
+ }
+ }
+ /*The flow looks like this:
+
+Read case (or no transaction):
+1. acquire locks
+2. do your stuff
+3. release locks
+
+Write (read/write) case:
+1. open transaction
+2. acquire locks, associating the locks with the transaction
+3. do your stuff
+4. commit or rollback
+*/
+}
diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/txn/HCatDbLockManager.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/txn/HCatDbLockManager.java
new file mode 100644
index 0000000..6893ae3
--- /dev/null
+++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/txn/HCatDbLockManager.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.hcatalog.txn;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.ql.lockmgr.DbLockManager;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
+import org.apache.thrift.TException;
+
+import java.util.List;
+
+/**
+ * Database lock manager for HCatalog. Extends
+ * {@link org.apache.hadoop.hive.ql.lockmgr.DbLockManager} with the ability to fetch back its
+ * lock info from the database. This is useful for MR jobs where the OutputCommitter
+ * needs to be retried.
+ */
+public class HCatDbLockManager extends DbLockManager {
+
+ static final private Log LOG = LogFactory.getLog(HCatDbLockManager.class.getName());
+
+
+ private String jobid;
+
+ HCatDbLockManager(HiveMetaStoreClient client, String jobid) {
+ super(client, true);
+ this.jobid = jobid;
+ }
+
+ @Override
+ public List getLocks(boolean verifyTablePartitions, boolean fetchData)
+ throws LockException {
+ if (locks == null || locks.size() == 0) {
+ reconstructLockInfo();
+ }
+ return super.getLocks(verifyTablePartitions, fetchData);
+ }
+
+ private void reconstructLockInfo() throws LockException {
+ // We need to find the transaction associated with this client. Usually this means we are in
+ // a retry state and thus we no longer know our transaction id.
+ try {
+ List lockList = client.showLocks().getLocks();
+ for (ShowLocksResponseElement lock : lockList) {
+ if (lock.getUser().contains('-' + jobid)) {
+ locks.add(new DbHiveLock(lock.getLockid()));
+ LOG.debug("Recovering lock with id " + lock.getLockid() + " from metastore");
+ }
+ }
+ } catch (TException e) {
+ throw new LockException("Unable to communicate with metastore to find our locks: "
+ + e.getMessage(), e);
+ }
+
+ }
+}
diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/txn/HCatDbTxnManager.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/txn/HCatDbTxnManager.java
new file mode 100644
index 0000000..2e501e6
--- /dev/null
+++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/txn/HCatDbTxnManager.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.hcatalog.txn;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.LockRequestBuilder;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.TxnInfo;
+import org.apache.hadoop.hive.ql.lockmgr.DbLockManager;
+import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
+import org.apache.hive.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hive.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hive.hcatalog.mapreduce.PartInfo;
+import org.apache.thrift.TException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Database transaction manager for HCatalog. Extends
+ * {@link org.apache.hadoop.hive.ql.lockmgr.DbTxnManager} with the ability to fetch back its
+ * transaction info from the database. This is useful for MR jobs where the OutputCommitter
+ * needs to be retried.
+ */
+public class HCatDbTxnManager extends DbTxnManager {
+
+ static final private Log LOG = LogFactory.getLog(HCatDbTxnManager.class.getName());
+
+ // The jobid is appended to the user name so that we can find our transactions in case we need
+ // to reconstruct the list.
+ private String jobid;
+
+ HCatDbTxnManager() {
+ super(true);
+ }
+
+ /**
+ * Set the jobid for this transaction manager. This must be called before any of the other
+ * calls in this class.
+ * @param jobid Hadoop jobid.
+ */
+ public void setJobid(String jobid) {
+ this.jobid = jobid;
+ }
+
+ @Override
+ protected DbLockManager instantiateLockMgr(HiveMetaStoreClient client) {
+ return new HCatDbLockManager(client, jobid);
+ }
+
+ @Override
+ public void openTxn(String user) throws LockException {
+ super.openTxn(buildUserName(user));
+ }
+
+ /**
+ * An HCatalog specific version of acquireLocks.
+ * @param input Input information for this job
+ * @param output Output information for this job
+ * @param user Name of the user
+ */
+ public void acquireLocks(InputJobInfo input, OutputJobInfo output, String user)
+ throws LockException {
+ init();
+ getLockManager();
+ LockRequestBuilder rqstBuilder = new LockRequestBuilder();
+
+ LOG.debug("Setting lock request transaction id to " + txnId);
+ rqstBuilder.setTransactionId(txnId).setUser(buildUserName(user));
+
+ if (input != null) {
+ List parts = input.getPartitions();
+ if (parts != null && parts.size() > 0) {
+ for (PartInfo part : parts) {
+ addInputComponent(rqstBuilder, input, part);
+ }
+ } else {
+ addInputComponent(rqstBuilder, input, null);
+ }
+ }
+
+ if (output != null) {
+ LockComponentBuilder compBuilder = new LockComponentBuilder();
+ compBuilder.setShared() // Shared because we are just inserting
+ .setDbName(output.getDatabaseName())
+ .setTableName(output.getTableName());
+ Map partVals = output.getPartitionValues();
+ if (partVals != null && partVals.size() > 0 && !output.isDynamicPartitioningUsed()) {
+ compBuilder.setPartitionName(FileUtils.makePartName(
+ new ArrayList(partVals.keySet()), new ArrayList(partVals.values())));
+ }
+ LockComponent component = compBuilder.build();
+ LOG.debug("Adding lock component to lock request " + component.toString());
+ rqstBuilder.addLockComponent(component);
+ }
+ List locks = lockMgr.lock(rqstBuilder.build());
+ if(LOG.isDebugEnabled()) {
+ StringBuilder sb = new StringBuilder("Locks acquired: ");
+ for(HiveLock hl : locks) {
+ sb.append(hl).append(" ");
+ }
+ sb.append(" input=").append(input == null ? "no" : "yes").append(" output=").append(output == null ? "no": "yes");
+ sb.append(addWaterMark());
+ LOG.debug(sb.toString());
+ }
+ }
+
+ @Override
+ public void commitTxn() throws LockException {
+ if (txnId < 1) reconstructTxnInfo();
+ super.commitTxn();
+ }
+
+ @Override
+ public void rollbackTxn() throws LockException {
+ if (txnId < 1) reconstructTxnInfo();
+ super.rollbackTxn();
+ }
+
+ @Override
+ public void heartbeat() throws LockException {
+ if (txnId < 1) reconstructTxnInfo();
+ super.heartbeat();
+ }
+
+ /**
+ * We override finalize to remove the
+ * {@link org.apache.hadoop.hive.ql.lockmgr.HiveTxnManagerImpl} implmentation which calls
+ * {@link org.apache.hadoop.hive.ql.lockmgr.HiveTxnManagerImpl#destruct()}. We don't want that
+ * called in the general case when this is finalized because it rolls back transactions and
+ * releases locks. Since the object might be finalized due to OutputCommitter failure,
+ * which will later be retried, we don't want to have destruct called. We can't override
+ * destruct because we still want
+ * {@link org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager#closeTxnManager()} to call it and
+ * have the expected effects of rolling back any open transactions and releasing any currently
+ * held locks.
+ * @throws Throwable
+ */
+ @Override
+ protected void finalize() throws Throwable {
+ }
+
+ /**
+ * For testing only.
+ * @return
+ */
+ String getJobid() {
+ return jobid;
+ }
+
+ private void addInputComponent(LockRequestBuilder rqstBuilder, InputJobInfo input,
+ PartInfo part) {
+ LockComponentBuilder compBuilder = new LockComponentBuilder();
+ compBuilder.setShared()
+ .setDbName(input.getDatabaseName())
+ .setTableName(input.getTableName());
+ if (part != null) {
+ Map partVals = part.getPartitionValues();
+ compBuilder.setPartitionName(FileUtils.makePartName(new ArrayList(partVals.keySet()),
+ new ArrayList(partVals.values())));
+ }
+ LockComponent component = compBuilder.build();
+ LOG.debug("Adding lock component to lock request " + component.toString());
+ rqstBuilder.addLockComponent(component);
+ }
+
+ private String buildUserName(String user) {
+ return user + '-' + jobid;
+ }
+
+ private void reconstructTxnInfo() throws LockException {
+ // We need to find the transaction associated with this client. Usually this means we are in
+ // a retry state and thus we no longer know our transaction id.
+ try {
+ List txns = client.showTxns().getOpen_txns();
+ for (TxnInfo txn : txns) {
+ if (txn.getUser().contains('-' + jobid)) {
+ txnId = txn.getId();
+ return;
+ }
+ }
+ //for read-only txns we don't do open() (as of 6/30/2014) - thus we can have locks w/o txn id
+ //throw new LockException("Unable to find a transaction that matches our jobid: " + jobid);
+ } catch (TException e) {
+ throw new LockException("Unable to communicate with metastore to find our transaction id: "
+ + e.getMessage(), e);
+ }
+ }
+ @Override
+ protected String addWaterMark() throws LockException {
+ return super.addWaterMark() + " jobid=" + getJobid();
+ }
+
+}
diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/cli/TestPermsGrp.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/cli/TestPermsGrp.java
index bf2b24e..815cb1f 100644
--- hcatalog/core/src/test/java/org/apache/hive/hcatalog/cli/TestPermsGrp.java
+++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/cli/TestPermsGrp.java
@@ -96,9 +96,9 @@ protected void setUp() throws Exception {
hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname, HCatSemanticAnalyzer.class.getName());
hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
- hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+// hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
hcatConf.set(HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT.varname, "60");
- hcatConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+// hcatConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
clientWH = new Warehouse(hcatConf);
msc = new HiveMetaStoreClient(hcatConf, null);
System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/cli/TestUseDatabase.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/cli/TestUseDatabase.java
index 8868623..ecd0e3a 100644
--- hcatalog/core/src/test/java/org/apache/hive/hcatalog/cli/TestUseDatabase.java
+++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/cli/TestUseDatabase.java
@@ -43,7 +43,7 @@ protected void setUp() throws Exception {
HiveConf hcatConf = new HiveConf(this.getClass());
hcatConf.set(ConfVars.PREEXECHOOKS.varname, "");
hcatConf.set(ConfVars.POSTEXECHOOKS.varname, "");
- hcatConf.set(ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+// hcatConf.set(ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
hcatConf.set(ConfVars.SEMANTIC_ANALYZER_HOOK.varname, HCatSemanticAnalyzer.class.getName());
hcatDriver = new Driver(hcatConf);
diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/common/TestHiveClientCache.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/common/TestHiveClientCache.java
index 63a5548..a6d8145 100644
--- hcatalog/core/src/test/java/org/apache/hive/hcatalog/common/TestHiveClientCache.java
+++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/common/TestHiveClientCache.java
@@ -234,8 +234,7 @@ public LocalMetaServer() {
HCatSemanticAnalyzer.class.getName());
hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname,
- "false");
+// hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
}
diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/HCatDataCheckUtil.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/HCatDataCheckUtil.java
index ff56234..8e90779 100644
--- hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/HCatDataCheckUtil.java
+++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/HCatDataCheckUtil.java
@@ -47,7 +47,7 @@ public static Driver instantiateDriver(MiniCluster cluster) {
}
hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+// hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
LOG.debug("Hive conf : {}", hiveConf.getAllProperties());
Driver driver = new Driver(hiveConf);
diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestReaderWriter.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestReaderWriter.java
index 4f92b68..10ef11c 100644
--- hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestReaderWriter.java
+++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestReaderWriter.java
@@ -32,10 +32,8 @@
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
-import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hive.hcatalog.common.HCatException;
import org.apache.hive.hcatalog.data.transfer.DataTransferFactory;
import org.apache.hive.hcatalog.data.transfer.HCatReader;
@@ -62,8 +60,8 @@ public void test() throws MetaException, CommandNeedRetryException,
Entry kv = itr.next();
map.put(kv.getKey(), kv.getValue());
}
-
- WriterContext cntxt = runsInMaster(map);
+ HCatWriter writerInMaster = runsInMaster(map);
+ WriterContext cntxt = writerInMaster.prepareWrite();
File writeCntxtFile = File.createTempFile("hcat-write", "temp");
writeCntxtFile.deleteOnExit();
@@ -80,9 +78,10 @@ public void test() throws MetaException, CommandNeedRetryException,
ois.close();
runsInSlave(cntxt);
- commit(map, true, cntxt);
+ commitInMaster(writerInMaster, true, cntxt);
- ReaderContext readCntxt = runsInMaster(map, false);
+ HCatReader reader = runsInMaster(map, false);
+ ReaderContext readCntxt = reader.prepareRead();
File readCntxtFile = File.createTempFile("hcat-read", "temp");
readCntxtFile.deleteOnExit();
@@ -98,23 +97,22 @@ public void test() throws MetaException, CommandNeedRetryException,
for (int i = 0; i < readCntxt.numSplits(); i++) {
runsInSlave(readCntxt, i);
}
+
+ reader.close(readCntxt);//called in Master
+
}
- private WriterContext runsInMaster(Map config) throws HCatException {
+ private HCatWriter runsInMaster(Map config) throws HCatException {
WriteEntity.Builder builder = new WriteEntity.Builder();
WriteEntity entity = builder.withTable("mytbl").build();
- HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
- WriterContext info = writer.prepareWrite();
- return info;
+ return DataTransferFactory.getHCatWriter(entity, config);
}
- private ReaderContext runsInMaster(Map config, boolean bogus)
+ private HCatReader runsInMaster(Map config, boolean bogus)
throws HCatException {
ReadEntity entity = new ReadEntity.Builder().withTable("mytbl").build();
- HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
- ReaderContext cntxt = reader.prepareRead();
- return cntxt;
+ return DataTransferFactory.getHCatReader(entity, config);
}
private void runsInSlave(ReaderContext cntxt, int slaveNum) throws HCatException {
@@ -141,16 +139,12 @@ private void runsInSlave(WriterContext context) throws HCatException {
writer.write(new HCatRecordItr());
}
- private void commit(Map config, boolean status,
- WriterContext context) throws IOException {
-
- WriteEntity.Builder builder = new WriteEntity.Builder();
- WriteEntity entity = builder.withTable("mytbl").build();
- HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
+ private void commitInMaster(HCatWriter writerInMaster, boolean status,
+ WriterContext context) throws IOException {
if (status) {
- writer.commit(context);
+ writerInMaster.commit(context);
} else {
- writer.abort(context);
+ writerInMaster.abort(context);
}
}
diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatBaseTest.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatBaseTest.java
index f0ed92c..7d581cb 100644
--- hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatBaseTest.java
+++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatBaseTest.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.hcatalog.common.HCatUtil;
@@ -60,6 +61,8 @@ public static void setUpTestDataDir() throws Exception {
FileUtil.fullyDelete(f);
}
Assert.assertTrue(new File(TEST_WAREHOUSE_DIR).mkdirs());
+ TxnDbUtil.cleanDb();
+ TxnDbUtil.prepDb();
}
@Before
@@ -80,6 +83,8 @@ protected void setUpHiveConf() {
hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, "");
hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, "");
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ //HIVE-6207
+ //TxnDbUtil.setConfValues(hiveConf);
hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR);
}
diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java
index 9ddc3a6..ab78e42 100644
--- hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java
+++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java
@@ -275,7 +275,7 @@ Job runMRCreate(Map partitionValues,
writeRecords = records;
MapCreate.writeCount = 0;
- Configuration conf = new Configuration();
+ Configuration conf = new Configuration(hiveConf);
Job job = new Job(conf, "hcat mapreduce write test");
job.setJarByClass(this.getClass());
job.setMapperClass(HCatMapReduceTest.MapCreate.class);
@@ -319,8 +319,9 @@ Job runMRCreate(Map partitionValues,
// Ensure counters are set when data has actually been read.
if (partitionValues != null) {
- assertTrue(job.getCounters().getGroup("FileSystemCounters")
- .findCounter("FILE_BYTES_READ").getValue() > 0);
+ long value = job.getCounters().getGroup("FileSystemCounters")
+ .findCounter("FILE_BYTES_READ").getValue();
+ assertTrue("Expected FILE_BYTES_READ > 0; got " + value, value > 0);
}
if (!HCatUtil.isHadoop23()) {
@@ -359,7 +360,7 @@ Job runMRCreate(Map partitionValues,
MapRead.readCount = 0;
readRecords.clear();
- Configuration conf = new Configuration();
+ Configuration conf = new Configuration(hiveConf);
Job job = new Job(conf, "hcat mapreduce read test");
job.setJarByClass(this.getClass());
job.setMapperClass(HCatMapReduceTest.MapRead.class);
diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java
index 049de54..387eea7 100644
--- hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java
+++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java
@@ -201,7 +201,7 @@ private static void initializeSetup() throws Exception {
HCatSemanticAnalyzer.class.getName());
hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+// hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
@@ -306,7 +306,7 @@ public void testOutputFormat() throws Throwable {
Path filePath = createInputFile();
FileInputFormat.addInputPath(job, filePath);
- Assert.assertTrue(job.waitForCompletion(true));
+ Assert.assertTrue("JobID=" + job.getJobID() + " failed.", job.waitForCompletion(true));
ArrayList outputs = new ArrayList();
for (String tbl : tableNames) {
diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitionPublish.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitionPublish.java
index be7134f..0cd3f28 100644
--- hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitionPublish.java
+++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitionPublish.java
@@ -121,8 +121,7 @@ public static void setup() throws Exception {
HCatSemanticAnalyzer.class.getName());
hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
- hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname,
- "false");
+// hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
msc = new HiveMetaStoreClient(hcatConf, null);
System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestPassProperties.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestPassProperties.java
index f8a0af1..1552d28 100644
--- hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestPassProperties.java
+++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestPassProperties.java
@@ -61,7 +61,7 @@ public void Initialize() throws Exception {
hiveConf = new HiveConf(this.getClass());
hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+// hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
driver = new Driver(hiveConf);
SessionState.start(new CliSessionState(hiveConf));
diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/txn/TestHCatDbTxnManager.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/txn/TestHCatDbTxnManager.java
new file mode 100644
index 0000000..b9f0653
--- /dev/null
+++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/txn/TestHCatDbTxnManager.java
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.hcatalog.txn;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.lockmgr.DbLockManager;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hive.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hive.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hive.hcatalog.mapreduce.PartInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.lang.reflect.InvocationTargetException;import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Tests for {@link org.apache.hive.hcatalog.txn.HCatDbTxnManager}
+ */
+public class TestHCatDbTxnManager {
+ static final private Log LOG = LogFactory.getLog(TestHCatDbTxnManager.class.getName());
+
+ private static HiveStorageHandler mockStorageHandler;
+
+ private HiveConf conf;
+ private HCatDbTxnManager txnMgr;
+
+ public TestHCatDbTxnManager() throws Exception {
+ tearDown(); // Just in case there were left overs from a previous run.
+ }
+
+ @Test
+ public void testSingleTableRead() throws Exception {
+ InputJobInfo input = buildInput("mydb", "mytable", null);
+ txnMgr.acquireLocks(input, null, "fred");
+ List locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(1,
+ TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId()));
+ txnMgr.getLockManager().unlock(locks.get(0));
+ locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(0, locks.size());
+ }
+
+ @Test
+ public void testSinglePartitionRead() throws Exception {
+ InputJobInfo input = buildInput("mydb", "mytable",
+ addPartition(null, addPartValue(null, "ds", "today")));
+ txnMgr.acquireLocks(input, null, "fred");
+ List locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(1,
+ TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId()));
+ txnMgr.getLockManager().unlock(locks.get(0));
+ locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(0, locks.size());
+ }
+
+ @Test
+ public void testSinglePartitionMultiValueRead() throws Exception {
+ InputJobInfo input = buildInput("mydb", "mytable",
+ addPartition(null, addPartValue(addPartValue(null, "region", "us"), "ds", "today")));
+ txnMgr.acquireLocks(input, null, "fred");
+ List locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(1,
+ TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId()));
+ txnMgr.getLockManager().unlock(locks.get(0));
+ locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(0, locks.size());
+ }
+
+ @Test
+ public void testMultiPartitionRead() throws Exception {
+ InputJobInfo input = buildInput("mydb", "mytable",
+ addPartition(addPartition(null, addPartValue(null, "ds", "yeserday")),
+ addPartValue(null, "ds", "today")));
+ txnMgr.acquireLocks(input, null, "fred");
+ List locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(2,
+ TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId()));
+ txnMgr.getLockManager().unlock(locks.get(0));
+ locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(0, locks.size());
+ }
+
+ @Test
+ public void testUnlockWithNewTxnMgr() throws Exception {
+ InputJobInfo input = buildInput("mydb", "mytable",
+ addPartition(null, addPartValue(null, "ds", "today")));
+ txnMgr.acquireLocks(input, null, "fred");
+ List locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(1,
+ TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).getLockId()));
+ String jobid = txnMgr.getJobid();
+ // Don't overwrite the existing txnMgr, as they may cause it to call finalize and release its
+ // locks, which we don't want
+ HCatDbTxnManager newTxnMgr =
+ (HCatDbTxnManager)TxnManagerFactory.getTxnManagerFactory(true).getTxnManager(conf);
+ newTxnMgr.setJobid(jobid);
+ newTxnMgr.getLockManager().unlock(locks.get(0));
+ locks = newTxnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(0, locks.size());
+ }
+
+ @Test
+ public void testShowWithNewTxnMgr() throws Exception {
+ InputJobInfo input = buildInput("mydb", "mytable",
+ addPartition(null, addPartValue(null, "ds", "today")));
+ txnMgr.acquireLocks(input, null, "fred");
+ String jobid = txnMgr.getJobid();
+ // Don't overwrite the existing txnMgr, as they may cause it to call finalize and release its
+ // locks, which we don't want.
+ HCatDbTxnManager newTxnMgr =
+ (HCatDbTxnManager)TxnManagerFactory.getTxnManagerFactory(true).getTxnManager(conf);
+ newTxnMgr.setJobid(jobid);
+ List locks = newTxnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(1,
+ TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId()));
+ newTxnMgr.getLockManager().unlock(locks.get(0));
+ locks = newTxnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(0, locks.size());
+ }
+
+ @Test
+ public void testSingleTableWrite() throws Exception {
+ OutputJobInfo output = OutputJobInfo.create("mydb", "mytable", null);
+ txnMgr.openTxn("fred");
+ txnMgr.acquireLocks(null, output, "fred");
+ List locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(1,
+ TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId()));
+ txnMgr.commitTxn();
+ locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(0, locks.size());
+ }
+
+ @Test
+ public void testSinglePartitionWrite() throws Exception {
+ OutputJobInfo output = OutputJobInfo.create("mydb", "mytable",
+ addPartValue(null, "ds", "today"));
+ txnMgr.openTxn("fred");
+ txnMgr.acquireLocks(null, output, "fred");
+ List locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(1,
+ TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId()));
+ txnMgr.commitTxn();
+ locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(0, locks.size());
+ }
+
+ @Test
+ public void testDynamicPartitionWrite() throws Exception {
+ OutputJobInfo output = OutputJobInfo.create("mydb", "mytable", null);
+ output.setDynamicPartitioningKeys(Arrays.asList(new String[]{"ds"}));
+ txnMgr.openTxn("fred");
+ txnMgr.acquireLocks(null, output, "fred");
+ List locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(1,
+ TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId()));
+ // Make sure we're locking the whole table, since this is dynamic partitioning
+ ShowLocksResponse rsp = ((DbLockManager)txnMgr.getLockManager()).getLocks();
+ List elms = rsp.getLocks();
+ Assert.assertEquals(1, elms.size());
+ Assert.assertNotNull(elms.get(0).getTablename());
+ Assert.assertNull(elms.get(0).getPartname());
+ txnMgr.commitTxn();
+ locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(0, locks.size());
+ }
+
+ @Test
+ public void testRollback() throws Exception {
+ OutputJobInfo output = OutputJobInfo.create("mydb", "mytable", null);
+ txnMgr.openTxn("fred");
+ txnMgr.acquireLocks(null, output, "fred");
+ List locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(1,
+ TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId()));
+ txnMgr.rollbackTxn();
+ locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(0, locks.size());
+ }
+
+ @Test
+ public void testReadWrite() throws Exception {
+ InputJobInfo input = buildInput("mydb", "mytable",
+ addPartition(null, addPartValue(null, "ds", "today")));
+ OutputJobInfo output = OutputJobInfo.create("mydb", "yourtable",
+ addPartValue(null, "ds", "today"));
+ txnMgr.openTxn("fred");
+ txnMgr.acquireLocks(input, output, "fred");
+ List locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(2,
+ TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId()));
+ txnMgr.commitTxn();
+ locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(0, locks.size());
+ }
+
+ @Before
+ public void setup() throws Exception {
+ conf = new HiveConf();
+ TxnDbUtil.setConfValues(conf);
+ TxnDbUtil.prepDb();
+ HiveTxnManager tmpMgr = TxnManagerFactory.getTxnManagerFactory(true).getTxnManager(conf);
+ Assert.assertTrue(tmpMgr instanceof HCatDbTxnManager);
+ txnMgr = (HCatDbTxnManager)tmpMgr;
+ txnMgr.setJobid(UUID.randomUUID().toString());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ TxnDbUtil.cleanDb();
+ }
+
+ private InputJobInfo buildInput(String dbName, String tableName,
+ List