diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 44d9a57..ef8b126 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -732,6 +732,7 @@ HIVE_STATS_DESERIALIZATION_FACTOR("hive.stats.deserialization.factor", (float) 1.0), // Concurrency + //HIVE-6207 HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", false), HIVE_LOCK_MANAGER("hive.lock.manager", "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager"), HIVE_LOCK_NUMRETRIES("hive.lock.numretries", 100), @@ -748,6 +749,8 @@ // Transactions HIVE_TXN_MANAGER("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"), + //HIVE-6207 +// "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"), // time after which transactions are declared aborted if the client has // not sent a heartbeat, in seconds. HIVE_TXN_TIMEOUT("hive.txn.timeout", 300), diff --git hcatalog/core/pom.xml hcatalog/core/pom.xml index b5e85cd..9a81e8c 100644 --- hcatalog/core/pom.xml +++ hcatalog/core/pom.xml @@ -71,6 +71,12 @@ jackson-mapper-asl ${jackson.version} + + org.mockito + mockito-all + ${mockito-all.version} + test + diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatReader.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatReader.java index f41336b..36c7e2e 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatReader.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatReader.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.data.transfer.state.StateProvider; @@ -33,14 +34,29 @@ /** * This abstract class is internal to HCatalog and abstracts away the notion of * underlying system from which reads will be done. + * + * Note that since 0.14 Hive provides ACID semantics, thus HCatalog must interact with Hive's + * Transaction Manager and so implementations of HCatReader must be implemented to use + * {@link org.apache.hive.hcatalog.mapreduce.TransactionContext} as documented in this class. + * Furthermore, HCatReader created on the master must stay around for the duration of the operation. + * More specifically, it's the TransactionContext that needs to stay around so that the transaction + * doesn't timeout the transaction. + * "mapreduce.job.user.name" property must be set. (is this enough for TxManager to restore state?) + * Note that HCatReader and HCatWriter are never part of the same transactions. + * {@link org.apache.hive.hcatalog.data.transfer.ReaderContext} can be used to restore HCatReader + * on the master in case of master crash/restart. The restored instance can then be used to close + * the transaction which was started before the crash. Note that restart must happen quickly to + * make sure that the transaction does not timeout. */ @InterfaceAudience.Public @InterfaceStability.Evolving public abstract class HCatReader { /** - * This should be called at master node to obtain {@link ReaderContext} which - * then should be serialized and sent to slave nodes. + * This should be called at master node to obtain {@link org.apache.hive.hcatalog.data.transfer.ReaderContext} + * which then should be serialized and sent to slave nodes. Implementations of this method + * must ensure to call {@link org.apache.hive.hcatalog.mapreduce.TransactionContext#onSetupJob(org.apache.hadoop.mapreduce.JobContext)} + * to acquire appropriate locks on resources being read. * * @return {@link ReaderContext} * @throws HCatException @@ -56,6 +72,13 @@ public abstract Iterator read() throws HCatException; /** + * This must be called on master node once reading is complete. Implementations of this method + * must ensure to call {@link org.apache.hive.hcatalog.mapreduce.TransactionContext#onCommitJob(org.apache.hadoop.mapreduce.JobContext)} + * to release resources. + * @throws HCatException + */ + public abstract void close(ReaderContext ctx) throws HCatException; + /** * This constructor will be invoked by {@link DataTransferFactory} at master * node. Don't use this constructor. Instead, use {@link DataTransferFactory} * @@ -80,6 +103,9 @@ protected HCatReader(final Configuration config, StateProvider sp) { this.sp = sp; } + protected HCatReader(final ReaderContext ctx) { + //todo: not finished + } protected ReadEntity re; // This will be null at slaves. protected Configuration conf; protected ReaderContext info; @@ -95,4 +121,12 @@ private HCatReader(final Map config) { this.conf = conf; } + /** + * TODO: move this to different package; probably move to Shim class + * and make the seeds static and then auto-increment + * @return + */ + public static JobID createJobID() { + return new JobID(Long.toString((long)(Math.random() * Long.MAX_VALUE)), (int)(Math.random() * 10000)); + } } diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatWriter.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatWriter.java index da6ad5b..8709934 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatWriter.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatWriter.java @@ -34,6 +34,8 @@ * This abstraction is internal to HCatalog. This is to facilitate writing to * HCatalog from external systems. Don't try to instantiate this directly. * Instead, use {@link DataTransferFactory} + * + * @see org.apache.hive.hcatalog.data.transfer.HCatReader */ @InterfaceAudience.Public @InterfaceStability.Evolving @@ -46,6 +48,9 @@ /** * External system should invoke this method exactly once from a master node. + * Implementations of this method must ensure to call + * {@link org.apache.hive.hcatalog.mapreduce.TransactionContext#onSetupJob(org.apache.hadoop.mapreduce.JobContext)} + * to acquire appropriate locks on resources being read. * * @return {@link WriterContext} This should be serialized and sent to slave * nodes to construct HCatWriter there. @@ -65,7 +70,9 @@ public abstract void write(final Iterator recordItr) /** * This method should be called at master node. Primary purpose of this is to - * do metadata commit. + * do metadata commit. Implementations of this method must ensure to call + * {@link org.apache.hive.hcatalog.mapreduce.TransactionContext#onCommitJob(org.apache.hadoop.mapreduce.JobContext)} + * to release resources. * * @throws {@link HCatException} */ @@ -73,9 +80,11 @@ public abstract void write(final Iterator recordItr) /** * This method should be called at master node. Primary purpose of this is to - * do cleanups in case of failures. + * do cleanups in case of failures. Implementations of this method must ensure to call + * {@link org.apache.hive.hcatalog.mapreduce.TransactionContext#onAbortJob(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.mapreduce.JobStatus.State)} + * to release resources. * - * @throws {@link HCatException} * + * @throws {@link HCatException} */ public abstract void abort(final WriterContext context) throws HCatException; diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/ReaderContext.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/ReaderContext.java index edf3654..c501d0a 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/ReaderContext.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/ReaderContext.java @@ -19,6 +19,8 @@ package org.apache.hive.hcatalog.data.transfer; +import org.apache.hadoop.mapreduce.JobID; + import java.io.Externalizable; @@ -27,6 +29,8 @@ * to the slaves. The contents of the class are opaque to the client. This * interface extends {@link java.io.Externalizable} so that implementing * classes can be serialized using standard Java mechanisms. + * + * */ public interface ReaderContext extends Externalizable { @@ -37,5 +41,5 @@ * @return number of splits */ public int numSplits(); - + public JobID getJobID(); } diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/WriterContext.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/WriterContext.java index bd47c35..96bfe12 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/WriterContext.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/WriterContext.java @@ -19,6 +19,8 @@ package org.apache.hive.hcatalog.data.transfer; +import org.apache.hadoop.mapreduce.JobID; + import java.io.Externalizable; /** @@ -29,5 +31,5 @@ * make it available to slaves to prepare for writes. */ public interface WriterContext extends Externalizable { - + public JobID getJobID(); } diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java index 8669ded..d23e311 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java @@ -24,10 +24,12 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; @@ -39,6 +41,7 @@ import org.apache.hive.hcatalog.data.transfer.ReaderContext; import org.apache.hive.hcatalog.data.transfer.state.StateProvider; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; +import org.apache.hive.hcatalog.mapreduce.TransactionContext; /** * This reader reads via {@link HCatInputFormat} @@ -47,6 +50,7 @@ public class HCatInputFormatReader extends HCatReader { private InputSplit split; + private TransactionContext txnCtx = null; public HCatInputFormatReader(ReaderContext context, int slaveNumber, StateProvider sp) { @@ -58,6 +62,16 @@ public HCatInputFormatReader(ReadEntity info, Map config) { super(info, config); } + /** + * this is meant to be used on recovery of Master node.... + * not finished + * @param context + */ + public HCatInputFormatReader(ReaderContext context) { + super(((ReaderContextImpl)context).getConf(), null); + info = context; + } + @Override public ReaderContext prepareRead() throws HCatException { try { @@ -65,9 +79,12 @@ public ReaderContext prepareRead() throws HCatException { HCatInputFormat hcif = HCatInputFormat.setInput( job, re.getDbName(), re.getTableName(), re.getFilterString()); ReaderContextImpl cntxt = new ReaderContextImpl(); + JobContext jc = ShimLoader.getHadoopShims().getHCatShim().createJobContext(job.getConfiguration(), cntxt.getJobID()); cntxt.setInputSplits(hcif.getSplits( - ShimLoader.getHadoopShims().getHCatShim().createJobContext(job.getConfiguration(), null))); + jc)); cntxt.setConf(job.getConfiguration()); + txnCtx = new TransactionContext(jc); + txnCtx.onSetupJob(jc); return cntxt; } catch (IOException e) { throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); @@ -93,6 +110,28 @@ public ReaderContext prepareRead() throws HCatException { return new HCatRecordItr(rr); } + @Override + public void close(ReaderContext ctx) throws HCatException { + if(txnCtx == null) { + //so now what? Did the client die and we have retry? + //in this case we should restore TransactionContext + //Or user error and close is called before prepareRead() + //or worse, it's called on slave, in which case we really + //don't want to call TransactionContext + return; + } + try { + //what's the right way to create context? + txnCtx.onCommitJob(createJobContext(conf, ctx)); + } + catch(IOException ex) { + throw new HCatException("WTF?"); + } + } + private static JobContext createJobContext(Configuration conf, ReaderContext ctx) throws IOException { + return ShimLoader.getHadoopShims().getHCatShim().createJobContext( + new Job(conf).getConfiguration(), ctx.getJobID()); + } private static class HCatRecordItr implements Iterator { private RecordReader curRecReader; diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java index cdbd829..67ff843 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java @@ -35,6 +35,7 @@ import org.apache.hive.hcatalog.common.ErrorType; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.data.transfer.HCatReader; import org.apache.hive.hcatalog.data.transfer.HCatWriter; import org.apache.hive.hcatalog.data.transfer.WriteEntity; import org.apache.hive.hcatalog.data.transfer.WriterContext; @@ -48,6 +49,7 @@ */ public class HCatOutputFormatWriter extends HCatWriter { + private volatile OutputCommitter oc; public HCatOutputFormatWriter(WriteEntity we, Map config) { super(we, config); } @@ -56,25 +58,36 @@ public HCatOutputFormatWriter(WriterContext cntxt, StateProvider sp) { super(((WriterContextImpl)cntxt).getConf(), sp); } + public HCatOutputFormatWriter(WriterContext ctx) { + //this is meant to be called from DataTransferFactory when HCatWriter + //on Master is restored after a crash; + //todo: finish this + super(((WriterContextImpl)ctx).getConf(), null); + //it should recreate OutputCommitter and call setupJob() to + //restart heartbeat thread + } @Override public WriterContext prepareWrite() throws HCatException { OutputJobInfo jobInfo = OutputJobInfo.create(we.getDbName(), we.getTableName(), we.getPartitionKVs()); Job job; + WriterContextImpl cntxt = new WriterContextImpl(); try { job = new Job(conf); + //make sure that OutputCommitter calls all use the same job id + job.setJobID(cntxt.getJobID()); HCatOutputFormat.setOutput(job, jobInfo); HCatOutputFormat.setSchema(job, HCatOutputFormat.getTableSchema(job.getConfiguration())); HCatOutputFormat outFormat = new HCatOutputFormat(); outFormat.checkOutputSpecs(job); - outFormat.getOutputCommitter(ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext( - job.getConfiguration(), ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptID())).setupJob(job); + oc = outFormat.getOutputCommitter(ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext( + job.getConfiguration(), ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptID())); + oc.setupJob(job); } catch (IOException e) { throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); } catch (InterruptedException e) { throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); } - WriterContextImpl cntxt = new WriterContextImpl(); cntxt.setConf(job.getConfiguration()); return cntxt; } @@ -126,16 +139,10 @@ public void write(Iterator recordItr) throws HCatException { public void commit(WriterContext context) throws HCatException { WriterContextImpl cntxtImpl = (WriterContextImpl)context; try { - new HCatOutputFormat().getOutputCommitter( - ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext( - cntxtImpl.getConf(), - ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptID())) - .commitJob(ShimLoader.getHadoopShims().getHCatShim().createJobContext( - cntxtImpl.getConf(), null)); + oc.commitJob(ShimLoader.getHadoopShims().getHCatShim().createJobContext( + cntxtImpl.getConf(), context.getJobID())); } catch (IOException e) { throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); - } catch (InterruptedException e) { - throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); } } @@ -143,16 +150,10 @@ public void commit(WriterContext context) throws HCatException { public void abort(WriterContext context) throws HCatException { WriterContextImpl cntxtImpl = (WriterContextImpl)context; try { - new HCatOutputFormat().getOutputCommitter( - ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext( - cntxtImpl.getConf(), - ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptID())) - .abortJob(ShimLoader.getHadoopShims().getHCatShim().createJobContext( - cntxtImpl.getConf(), null), State.FAILED); + oc.abortJob(ShimLoader.getHadoopShims().getHCatShim().createJobContext( + cntxtImpl.getConf(), context.getJobID()), State.FAILED); } catch (IOException e) { throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); - } catch (InterruptedException e) { - throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); } } diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/ReaderContextImpl.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/ReaderContextImpl.java index 89628a4..102076d 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/ReaderContextImpl.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/ReaderContextImpl.java @@ -28,11 +28,13 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hive.hcatalog.data.transfer.HCatReader; import org.apache.hive.hcatalog.data.transfer.ReaderContext; import org.apache.hive.hcatalog.mapreduce.HCatSplit; /** - * This class contains the list of {@link InputSplit}s obtained + * This class contains the list of {@link org.apache.hadoop.mapreduce.InputSplit}s obtained * at master node and the configuration. */ class ReaderContextImpl implements ReaderContext, Configurable { @@ -40,10 +42,12 @@ private static final long serialVersionUID = -2656468331739574367L; private List splits; private Configuration conf; + private final JobID jobID; public ReaderContextImpl() { this.splits = new ArrayList(); this.conf = new Configuration(); + this.jobID = HCatReader.createJobID(); } void setInputSplits(final List splits) { @@ -60,6 +64,11 @@ public int numSplits() { } @Override + public JobID getJobID() { + return jobID; + } + + @Override public Configuration getConf() { return conf; } @@ -76,6 +85,7 @@ public void writeExternal(ObjectOutput out) throws IOException { for (InputSplit split : splits) { ((HCatSplit) split).write(out); } + jobID.write(out); } @Override @@ -88,5 +98,6 @@ public void readExternal(ObjectInput in) throws IOException, split.readFields(in); splits.add(split); } + jobID.readFields(in); } } diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/WriterContextImpl.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/WriterContextImpl.java index 088d258..835a14e 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/WriterContextImpl.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/WriterContextImpl.java @@ -26,6 +26,8 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hive.hcatalog.data.transfer.HCatReader; import org.apache.hive.hcatalog.data.transfer.WriterContext; /** @@ -38,9 +40,11 @@ private static final long serialVersionUID = -5899374262971611840L; private Configuration conf; + private final JobID jobID; public WriterContextImpl() { conf = new Configuration(); + jobID = HCatReader.createJobID(); } @Override @@ -56,11 +60,17 @@ public void setConf(final Configuration config) { @Override public void writeExternal(ObjectOutput out) throws IOException { conf.write(out); + jobID.write(out); } @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { conf.readFields(in); + jobID.readFields(in); + } + @Override + public JobID getJobID() { + return jobID; } } diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/package.html hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/package.html new file mode 100644 index 0000000..b85bd8c --- /dev/null +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/package.html @@ -0,0 +1,18 @@ + + + + + + + + +HCatalog DataTransfer API + + + +This API provides a way to read/write Hive tables w/o creating any MapReduce jobs. +For details see HCatReader/Writer + + + diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputCommitterContainer.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputCommitterContainer.java index cead40d..6ac835e 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputCommitterContainer.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputCommitterContainer.java @@ -66,6 +66,7 @@ public boolean needsTaskCommit(TaskAttemptContext context) throws IOException { @Override public void setupJob(JobContext context) throws IOException { getBaseOutputCommitter().setupJob(HCatMapRedUtil.createJobContext(context)); + super.setupJob(context); } @Override @@ -76,12 +77,14 @@ public void setupTask(TaskAttemptContext context) throws IOException { @Override public void abortJob(JobContext jobContext, State state) throws IOException { getBaseOutputCommitter().abortJob(HCatMapRedUtil.createJobContext(jobContext), state); + super.abortJob(jobContext, state); cleanupJob(jobContext); } @Override public void commitJob(JobContext jobContext) throws IOException { getBaseOutputCommitter().commitJob(HCatMapRedUtil.createJobContext(jobContext)); + super.commitJob(jobContext); cleanupJob(jobContext); } diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java index 491da14..f6fc6a3 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java @@ -146,6 +146,7 @@ public void setupJob(JobContext context) throws IOException { getBaseOutputCommitter().setupJob(HCatMapRedUtil.createJobContext(context)); } // in dynamic usecase, called through FileRecordWriterContainer + super.setupJob(context); } @Override @@ -199,6 +200,7 @@ public void abortJob(JobContext jobContext, State state) throws IOException { if (!src.equals(tblPath)){ fs.delete(src, true); } + super.abortJob(jobContext, state); } finally { cancelDelegationTokens(jobContext); } @@ -246,6 +248,7 @@ public void commitJob(JobContext jobContext) throws IOException { } } } + super.commitJob(jobContext); // Commit has succeeded (since no exceptions have been thrown.) // Safe to cancel delegation tokens now. diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java index 360e77b..bea15e0 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java @@ -90,6 +90,10 @@ private InputJobInfo(String databaseName, this.properties = properties == null ? new Properties() : properties; } + public String getQualifiedTableName() { + return databaseName + "." + tableName; + } + /** * Gets the value of databaseName * @return the databaseName @@ -183,4 +187,14 @@ private void readObject(ObjectInputStream ois) new ObjectInputStream(new InflaterInputStream(ois)); partitions = (List)partInfoReader.readObject(); } + + + /** + * Built to enable testing with this class in other packages. + * @param partitions + */ + public void setPartitionsForTesting(List partitions) { + this.partitions = partitions; + } + } diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputCommitterContainer.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputCommitterContainer.java index 2125e14..3154424 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputCommitterContainer.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputCommitterContainer.java @@ -20,21 +20,26 @@ package org.apache.hive.hcatalog.mapreduce; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.OutputCommitter; +import java.io.IOException; + /** * This class will contain an implementation of an OutputCommitter. * See {@link OutputFormatContainer} for more information about containers. */ abstract class OutputCommitterContainer extends OutputCommitter { private final org.apache.hadoop.mapred.OutputCommitter committer; + private final TransactionContext transactionContext; /** * @param context current JobContext * @param committer OutputCommitter that this instance will contain */ - public OutputCommitterContainer(JobContext context, org.apache.hadoop.mapred.OutputCommitter committer) { + public OutputCommitterContainer(JobContext context, org.apache.hadoop.mapred.OutputCommitter committer) throws IOException { this.committer = committer; + transactionContext = new TransactionContext(context); } /** @@ -43,5 +48,16 @@ public OutputCommitterContainer(JobContext context, org.apache.hadoop.mapred.Out public OutputCommitter getBaseOutputCommitter() { return committer; } - + @Override + public void setupJob(JobContext context) throws IOException { + transactionContext.onSetupJob(context); + } + @Override + public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException { + transactionContext.onAbortJob(jobContext, state); + } + @Override + public void commitJob(JobContext jobContext) throws IOException { + transactionContext.onCommitJob(jobContext); + } } diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java index d50f43b..7cdd633 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java @@ -96,7 +96,9 @@ private OutputJobInfo(String databaseName, this.partitionValues = partitionValues; this.properties = new Properties(); } - + public String getQualifiedTableName() { + return databaseName + "." + tableName; + } /** * @return the posOfPartCols */ diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/TransactionContext.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/TransactionContext.java new file mode 100644 index 0000000..f37eb4c --- /dev/null +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/TransactionContext.java @@ -0,0 +1,370 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.hcatalog.mapreduce; + +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Heartbeater; +import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.util.StringUtils; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.txn.HCatDbTxnManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * In order for MR jobs using HCatalog to support/play nice with Hive's ACID semantics they must + * interact with Hive Transaction Manager. This class implements the usual BEGIN TRANSACTION and + * (COMMIT|ABORT TRANSACTION) methods. It's the responsibility of the MR job to call these. + * In order to support fire and forget MR job submission, this class is designed to be invoked from + * the OutputCommitter of the job. + * Jobs using HCatOutputFormat will automatically make use of this. Jobs using HCatInputFormat and + * anything other than HCatOutputFormat must make sure that the OutputCommitter of the job calls + * A more detailed design note can be found in + * HIVE-6207. + * {@link #onSetupJob(org.apache.hadoop.mapreduce.JobContext)}, + * {@link #onCommitJob(org.apache.hadoop.mapreduce.JobContext)} and + * {@link #onAbortJob(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.mapreduce.JobStatus.State)} + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class TransactionContext { + /*Some implementation notes: + For the happy path, we can expect a single instance of OutputCommitter on which all lifecycle methods + are called. This instance is called from ApplicationMaster. In case of container failures + (or other conditions that cause job retries), we may end up initiating a TX from one instance of + TransactionContext and committing/aborting from another. + (is this actually true? When a job is retied do all lifecycle methods get retired?) + + Use this to determine that it's retried + return context.getConfiguration().getInt( + MRJobConfig.APPLICATION_ATTEMPT_ID, 0); + + * todo: + * So far this implements the "happy path". Need to do some work to properly handle the retry of the job. + * + * Current design requires that jobs using HCatInputFormat but not HCatOutputFormat are modified in + * order to work with Hive's Transaction Manager + * how do we know if this is retried? Can we modify Configuration? + * think about retry logic; where should acquired locks be stored. + * What about transaction id? + * todo: Heartbeat (use org.apache.hadoop.hive.ql.exec.Heartbeater) + * Dependencies + * 1. ensure that TxManager is re-entrant + * 2. ensure that TxManager is stateless, i.e. we can acquire locks in one instance and release in another. + * */ + private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class); + private static final String ERR_MSG = "Transaction Manger error: "; + private final HCatDbTxnManager txnManager; + /** + * Use this prevent multiple making multiple close/commit/etc calls to + * TxnManager since some of the methods here may be called more than once. + */ + private volatile boolean isTxClosed = false; + private final HiveConf hiveConf; + //this probably needs to be stored in Context as well + private boolean isReadOnlyTx = false; + /*should this be stored in Context? + * In case of retries, we are not likely to get the same instance of DefaultOutputCommitterContainer; + * In fact, retries may be due to container failure so even static variables won't work.*/ + private List locks = null; + + private final ScheduledExecutorService scheduler; + //are we guaranteed that commit/abort will be called on the same instance when there is + //no retry? Are we even guaranteed the same container? Presumably there is no point + //having > 1 container if there are no retries. + private final Heartbeater heartbeater; + public TransactionContext(final JobContext context) throws IOException { + hiveConf = HCatUtil.getHiveConf(context.getConfiguration()); + //set 'nostrict' so that we can generate the SQL and QueryPlan to acquire locks + //we don't actually run the query, so if user's job is not configured properly + //wrt Hive config, it will fail at runtime + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nostrict"); + if(!hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY) || + //the 2nd condition is there because if concurrency is enabled but DummyTxnManager is used, + //it tries to use Zookeeper which requires additional set up + !DbTxnManager.class.getName().equals(hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER))) {//site-wide property + txnManager = null; + heartbeater = null; + scheduler = null; + LOG.debug("TransactionContext(disabled) " + makeWaterMark(context)); + return; + } + try { + txnManager = (HCatDbTxnManager)TxnManagerFactory.getTxnManagerFactory(true).getTxnManager(hiveConf); + txnManager.setJobid(context.getJobID().toString()); + //this is crucial so that it can restore it's state if necessary + heartbeater = new Heartbeater(txnManager, context.getConfiguration(), context.getJobID().toString()); + scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() { + @Override + public Thread newThread(Runnable runnable) { + Thread t = new Thread(runnable, "HCatTxHbTh" + context.getJobID()); + t.setDaemon(true); + return t; + } + }); + } + catch(LockException ex) { + throw new IOException(ERR_MSG + ex.getMessage(), ex); + } + LOG.debug("TransactionContext(enabled) " + makeWaterMark(context), new Exception("stack trace")); + } + /** + * Semantically, this is the BEGIN TRANSACTION method. This must be called from + * {@link org.apache.hadoop.mapreduce.OutputCommitter#setupJob(org.apache.hadoop.mapreduce.JobContext)}, + * this will be called once per job attempt. + * @throws IOException + */ + public void onSetupJob(final JobContext jobContext) throws IOException { + LOG.debug("onSetupJob() " + makeWaterMark(jobContext), new Exception("stack trace")); + //called once per job attempt (mapreduce API) + if(txnManager == null) { + return;//locking disabled + } + int attemptId = jobContext.getConfiguration().getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0); + if(attemptId > 0) { + /*so now we know it's being retried - strictly speaking we don't know anything else + so we don't know if locks were acquired or lock acquisition, for example, failed + Nor do we know if we've started the heartbeat thread. For example, could the + old instance be around and still beating? Too many heartbeats can overwhelm the DB. + + so if we assume that retry only happens because container failed (i.e. JVM) there is no + issue as acquireLocks() is idempotent (at least that was the agreement with Alan) + Also, if context has no InputJobInfo/OutputJobInfo, acquire on TM does nothing - + we need this for HCatReader/Writer recovery + + If retry could mean that some logical container failed, i.e. the object is still around + we don't want to start new heartbeat threads... On the other hand, it seems silly that + JT/AM would keep old instances around, so we can expect them to be GC'd thus finalize should kill the + HB thread.... (even though it's not guaranteed to be called) + + You could also imagine that AM is doing something really stupid and creating multiple + instances of this class and calling different methods on different instances. + + I suppose the worst would be if it creates an instance and calls setup. the nukes that instance of OC. + then creates another to call commit. This means we'd have to make scheduler static. + of course AM could always spawn a new JVM for each instance of OC in which case we'd be fucked + + + */ + LOG.debug("attemptId=" + attemptId + makeWaterMark(jobContext)); + } + if(!acquireLocks2(jobContext)) { + //no locks acquired, don't bother with heartbeat + return; + } + //use 'delay' rather than 'rate' since we don't know how long a beat may take and we don't + //want to make beat interval to high so that Heartbeater config controls the actual + //frequency of beats + scheduler.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + try { + heartbeater.heartbeat(); + } catch (IOException ex) { + //don't re-throw here - it will kill the scheduled task; hope that ping failure is transient + LOG.error("Failed to send heartbeat to Transaction Manager: " + ex.getMessage() + + " Transaction may timeout: " + makeWaterMark(jobContext), ex); + } + } + }, 10L, 5L, TimeUnit.SECONDS); + } + + private boolean acquireLocks2(JobContext context) throws IOException { + String userName = context.getConfiguration().get("mapreduce.job.user.name"); + String inputInfo = context.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO); + String outputInfo = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO); + if(inputInfo == null && outputInfo == null) { + return false; + } + InputJobInfo inputJobInfo = null; + if(inputInfo != null) { + inputJobInfo = (InputJobInfo) HCatUtil.deserialize(inputInfo); + } + OutputJobInfo outputJobInfo = null; + if(outputInfo != null) { + outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(outputInfo); + } + isReadOnlyTx = inputJobInfo != null && outputJobInfo == null; + try { + if(!isReadOnlyTx) { + txnManager.openTxn(userName); + } + txnManager.acquireLocks(inputJobInfo, outputJobInfo, userName); + } + catch(LockException ex) { + wrapAndThrow(ex); + } + return true; + } + /** + * Semantically, this is the ABORT TRANSACTION method. This must be called from + * {@link org.apache.hadoop.mapreduce.OutputCommitter#abortJob(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.mapreduce.JobStatus.State)}. + * @throws IOException + */ + public void onAbortJob(JobContext jobContext, JobStatus.State state) throws IOException { + LOG.debug("onAbortJob(state=" + state + ") " + makeWaterMark(jobContext)); + //may be called multiple times + if(txnManager == null) { + return;//locking disabled + } + preCleanUp(); + if(isReadOnlyTx) { + return;//HiveTxnManager#openTxn() is only called for write TX + } + try { + txnManager.rollbackTxn(); + } + catch(LockException ex) { + wrapAndThrow(ex); + } + cleanup(jobContext); + } + /** + * Semantically, this is the COMMIT TRANSACTION method. This must be called from + * {@link org.apache.hadoop.mapreduce.OutputCommitter#commitJob(org.apache.hadoop.mapreduce.JobContext)}, + * which is guaranteed to be called exactly once (if job is successful) + */ + public void onCommitJob(JobContext jobContext) throws IOException { + LOG.debug("onCommitJob()" + makeWaterMark(jobContext)); + //called at most once + if(txnManager == null) { + return;//locking disabled + } + preCleanUp(); + if(isReadOnlyTx) { + return;//HiveTxnManager#openTxn() is only called for write TX + } + try { + txnManager.commitTxn(); + } + catch(LockException ex) { + wrapAndThrow(ex); + } + cleanup(jobContext); + } + private static void wrapAndThrow(LockException ex) throws IOException { + throw new IOException(ERR_MSG + ex.getMessage(), ex); + } + private void addLocks(List newLocks) { + if(locks == null) { + locks = newLocks; + } + locks.addAll(newLocks); + } + /** + * stop heartbeat thread + */ + private void preCleanUp() { + if(scheduler == null || scheduler.isTerminated()) { + return; + } + try { + scheduler.shutdown(); + if (!scheduler.awaitTermination(10L, TimeUnit.SECONDS)) { + LOG.warn("Could not cancel heartbeat for 10 seconds. Proceeding to terminate."); + } + } + catch(InterruptedException ex) { + LOG.warn("Interrupted trying to shutdown heartbeat"); + } + scheduler.shutdownNow(); + } + private static boolean sanityCheck(JobContext context) { + //sanity check: if OutputJobInfo is present but the job is not using HCatOutputFormat (or + // a subclass) + //this must be an error; HCatOutputFormat only makes sense with OutputJobInfo... + try { + Class> of = context.getOutputFormatClass(); + if(of != null && HCatOutputFormat.class.isAssignableFrom(of)) { + LOG.warn("OutputFormat=" + HCatOutputFormat.class + " but no " + + OutputJobInfo.class.getName() + " found in " + HCatConstants.HCAT_KEY_OUTPUT_INFO); + } + return true; + } + catch(ClassNotFoundException ex) { + LOG.warn(StringUtils.stringifyException(ex)); + } + return false; + } + private void cleanup(JobContext context) { + LOG.warn("cleanup()" + makeWaterMark(context)); + if(locks != null && isReadOnlyTx) { + try { + txnManager.getLockManager().releaseLocks(locks); + } + catch(LockException ex) { + LOG.warn(StringUtils.stringifyException(ex)); + } + } + //is this OK or are we leaking resources when there are retries, etc? + txnManager.closeTxnManager(); + isTxClosed = true; + } + private String makeWaterMark(JobContext context) { + return "JobID=" + context.getJobID() + " this=" + System.identityHashCode(this) + " thread=" + Thread.currentThread().getId(); + } + + /** + * MR framework creates multiple instance of this object (even when there are no retires of the job), + * but doesn't necessarily call call all/any methods on each instance. This is an attempt not to + * leak TxnManager resources. + * @throws Throwable + */ + @Override + protected void finalize() throws Throwable { + super.finalize(); + if(scheduler != null && !scheduler.isTerminated()) { + scheduler.shutdownNow(); + } + if(txnManager != null && !isTxClosed) { + //don't do this - it will kill the Tx, so if job is retried and an old instance of OutputCommitter gets + //GC'd, it may kill TX prematurely. + //txnManager.closeTxnManager(); + } + } + /*The flow looks like this: + +Read case (or no transaction): +1. acquire locks +2. do your stuff +3. release locks + +Write (read/write) case: +1. open transaction +2. acquire locks, associating the locks with the transaction +3. do your stuff +4. commit or rollback +*/ +} diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/txn/HCatDbLockManager.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/txn/HCatDbLockManager.java new file mode 100644 index 0000000..6893ae3 --- /dev/null +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/txn/HCatDbLockManager.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.hcatalog.txn; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; +import org.apache.hadoop.hive.ql.lockmgr.DbLockManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.thrift.TException; + +import java.util.List; + +/** + * Database lock manager for HCatalog. Extends + * {@link org.apache.hadoop.hive.ql.lockmgr.DbLockManager} with the ability to fetch back its + * lock info from the database. This is useful for MR jobs where the OutputCommitter + * needs to be retried. + */ +public class HCatDbLockManager extends DbLockManager { + + static final private Log LOG = LogFactory.getLog(HCatDbLockManager.class.getName()); + + + private String jobid; + + HCatDbLockManager(HiveMetaStoreClient client, String jobid) { + super(client, true); + this.jobid = jobid; + } + + @Override + public List getLocks(boolean verifyTablePartitions, boolean fetchData) + throws LockException { + if (locks == null || locks.size() == 0) { + reconstructLockInfo(); + } + return super.getLocks(verifyTablePartitions, fetchData); + } + + private void reconstructLockInfo() throws LockException { + // We need to find the transaction associated with this client. Usually this means we are in + // a retry state and thus we no longer know our transaction id. + try { + List lockList = client.showLocks().getLocks(); + for (ShowLocksResponseElement lock : lockList) { + if (lock.getUser().contains('-' + jobid)) { + locks.add(new DbHiveLock(lock.getLockid())); + LOG.debug("Recovering lock with id " + lock.getLockid() + " from metastore"); + } + } + } catch (TException e) { + throw new LockException("Unable to communicate with metastore to find our locks: " + + e.getMessage(), e); + } + + } +} diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/txn/HCatDbTxnManager.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/txn/HCatDbTxnManager.java new file mode 100644 index 0000000..2e501e6 --- /dev/null +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/txn/HCatDbTxnManager.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.hcatalog.txn; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.LockComponentBuilder; +import org.apache.hadoop.hive.metastore.LockRequestBuilder; +import org.apache.hadoop.hive.metastore.api.LockComponent; +import org.apache.hadoop.hive.metastore.api.TxnInfo; +import org.apache.hadoop.hive.ql.lockmgr.DbLockManager; +import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.hive.hcatalog.mapreduce.InputJobInfo; +import org.apache.hive.hcatalog.mapreduce.OutputJobInfo; +import org.apache.hive.hcatalog.mapreduce.PartInfo; +import org.apache.thrift.TException; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Database transaction manager for HCatalog. Extends + * {@link org.apache.hadoop.hive.ql.lockmgr.DbTxnManager} with the ability to fetch back its + * transaction info from the database. This is useful for MR jobs where the OutputCommitter + * needs to be retried. + */ +public class HCatDbTxnManager extends DbTxnManager { + + static final private Log LOG = LogFactory.getLog(HCatDbTxnManager.class.getName()); + + // The jobid is appended to the user name so that we can find our transactions in case we need + // to reconstruct the list. + private String jobid; + + HCatDbTxnManager() { + super(true); + } + + /** + * Set the jobid for this transaction manager. This must be called before any of the other + * calls in this class. + * @param jobid Hadoop jobid. + */ + public void setJobid(String jobid) { + this.jobid = jobid; + } + + @Override + protected DbLockManager instantiateLockMgr(HiveMetaStoreClient client) { + return new HCatDbLockManager(client, jobid); + } + + @Override + public void openTxn(String user) throws LockException { + super.openTxn(buildUserName(user)); + } + + /** + * An HCatalog specific version of acquireLocks. + * @param input Input information for this job + * @param output Output information for this job + * @param user Name of the user + */ + public void acquireLocks(InputJobInfo input, OutputJobInfo output, String user) + throws LockException { + init(); + getLockManager(); + LockRequestBuilder rqstBuilder = new LockRequestBuilder(); + + LOG.debug("Setting lock request transaction id to " + txnId); + rqstBuilder.setTransactionId(txnId).setUser(buildUserName(user)); + + if (input != null) { + List parts = input.getPartitions(); + if (parts != null && parts.size() > 0) { + for (PartInfo part : parts) { + addInputComponent(rqstBuilder, input, part); + } + } else { + addInputComponent(rqstBuilder, input, null); + } + } + + if (output != null) { + LockComponentBuilder compBuilder = new LockComponentBuilder(); + compBuilder.setShared() // Shared because we are just inserting + .setDbName(output.getDatabaseName()) + .setTableName(output.getTableName()); + Map partVals = output.getPartitionValues(); + if (partVals != null && partVals.size() > 0 && !output.isDynamicPartitioningUsed()) { + compBuilder.setPartitionName(FileUtils.makePartName( + new ArrayList(partVals.keySet()), new ArrayList(partVals.values()))); + } + LockComponent component = compBuilder.build(); + LOG.debug("Adding lock component to lock request " + component.toString()); + rqstBuilder.addLockComponent(component); + } + List locks = lockMgr.lock(rqstBuilder.build()); + if(LOG.isDebugEnabled()) { + StringBuilder sb = new StringBuilder("Locks acquired: "); + for(HiveLock hl : locks) { + sb.append(hl).append(" "); + } + sb.append(" input=").append(input == null ? "no" : "yes").append(" output=").append(output == null ? "no": "yes"); + sb.append(addWaterMark()); + LOG.debug(sb.toString()); + } + } + + @Override + public void commitTxn() throws LockException { + if (txnId < 1) reconstructTxnInfo(); + super.commitTxn(); + } + + @Override + public void rollbackTxn() throws LockException { + if (txnId < 1) reconstructTxnInfo(); + super.rollbackTxn(); + } + + @Override + public void heartbeat() throws LockException { + if (txnId < 1) reconstructTxnInfo(); + super.heartbeat(); + } + + /** + * We override finalize to remove the + * {@link org.apache.hadoop.hive.ql.lockmgr.HiveTxnManagerImpl} implmentation which calls + * {@link org.apache.hadoop.hive.ql.lockmgr.HiveTxnManagerImpl#destruct()}. We don't want that + * called in the general case when this is finalized because it rolls back transactions and + * releases locks. Since the object might be finalized due to OutputCommitter failure, + * which will later be retried, we don't want to have destruct called. We can't override + * destruct because we still want + * {@link org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager#closeTxnManager()} to call it and + * have the expected effects of rolling back any open transactions and releasing any currently + * held locks. + * @throws Throwable + */ + @Override + protected void finalize() throws Throwable { + } + + /** + * For testing only. + * @return + */ + String getJobid() { + return jobid; + } + + private void addInputComponent(LockRequestBuilder rqstBuilder, InputJobInfo input, + PartInfo part) { + LockComponentBuilder compBuilder = new LockComponentBuilder(); + compBuilder.setShared() + .setDbName(input.getDatabaseName()) + .setTableName(input.getTableName()); + if (part != null) { + Map partVals = part.getPartitionValues(); + compBuilder.setPartitionName(FileUtils.makePartName(new ArrayList(partVals.keySet()), + new ArrayList(partVals.values()))); + } + LockComponent component = compBuilder.build(); + LOG.debug("Adding lock component to lock request " + component.toString()); + rqstBuilder.addLockComponent(component); + } + + private String buildUserName(String user) { + return user + '-' + jobid; + } + + private void reconstructTxnInfo() throws LockException { + // We need to find the transaction associated with this client. Usually this means we are in + // a retry state and thus we no longer know our transaction id. + try { + List txns = client.showTxns().getOpen_txns(); + for (TxnInfo txn : txns) { + if (txn.getUser().contains('-' + jobid)) { + txnId = txn.getId(); + return; + } + } + //for read-only txns we don't do open() (as of 6/30/2014) - thus we can have locks w/o txn id + //throw new LockException("Unable to find a transaction that matches our jobid: " + jobid); + } catch (TException e) { + throw new LockException("Unable to communicate with metastore to find our transaction id: " + + e.getMessage(), e); + } + } + @Override + protected String addWaterMark() throws LockException { + return super.addWaterMark() + " jobid=" + getJobid(); + } + +} diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/cli/TestPermsGrp.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/cli/TestPermsGrp.java index bf2b24e..815cb1f 100644 --- hcatalog/core/src/test/java/org/apache/hive/hcatalog/cli/TestPermsGrp.java +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/cli/TestPermsGrp.java @@ -96,9 +96,9 @@ protected void setUp() throws Exception { hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname, HCatSemanticAnalyzer.class.getName()); hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); +// hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); hcatConf.set(HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT.varname, "60"); - hcatConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); +// hcatConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); clientWH = new Warehouse(hcatConf); msc = new HiveMetaStoreClient(hcatConf, null); System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " "); diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/cli/TestUseDatabase.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/cli/TestUseDatabase.java index 8868623..ecd0e3a 100644 --- hcatalog/core/src/test/java/org/apache/hive/hcatalog/cli/TestUseDatabase.java +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/cli/TestUseDatabase.java @@ -43,7 +43,7 @@ protected void setUp() throws Exception { HiveConf hcatConf = new HiveConf(this.getClass()); hcatConf.set(ConfVars.PREEXECHOOKS.varname, ""); hcatConf.set(ConfVars.POSTEXECHOOKS.varname, ""); - hcatConf.set(ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); +// hcatConf.set(ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); hcatConf.set(ConfVars.SEMANTIC_ANALYZER_HOOK.varname, HCatSemanticAnalyzer.class.getName()); hcatDriver = new Driver(hcatConf); diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/common/TestHiveClientCache.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/common/TestHiveClientCache.java index 63a5548..a6d8145 100644 --- hcatalog/core/src/test/java/org/apache/hive/hcatalog/common/TestHiveClientCache.java +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/common/TestHiveClientCache.java @@ -234,8 +234,7 @@ public LocalMetaServer() { HCatSemanticAnalyzer.class.getName()); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, - "false"); +// hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " "); System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " "); } diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/HCatDataCheckUtil.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/HCatDataCheckUtil.java index ff56234..8e90779 100644 --- hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/HCatDataCheckUtil.java +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/HCatDataCheckUtil.java @@ -47,7 +47,7 @@ public static Driver instantiateDriver(MiniCluster cluster) { } hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); +// hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); LOG.debug("Hive conf : {}", hiveConf.getAllProperties()); Driver driver = new Driver(hiveConf); diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestReaderWriter.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestReaderWriter.java index 4f92b68..10ef11c 100644 --- hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestReaderWriter.java +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestReaderWriter.java @@ -32,10 +32,8 @@ import java.util.Map; import java.util.Map.Entry; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.CommandNeedRetryException; -import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.data.transfer.DataTransferFactory; import org.apache.hive.hcatalog.data.transfer.HCatReader; @@ -62,8 +60,8 @@ public void test() throws MetaException, CommandNeedRetryException, Entry kv = itr.next(); map.put(kv.getKey(), kv.getValue()); } - - WriterContext cntxt = runsInMaster(map); + HCatWriter writerInMaster = runsInMaster(map); + WriterContext cntxt = writerInMaster.prepareWrite(); File writeCntxtFile = File.createTempFile("hcat-write", "temp"); writeCntxtFile.deleteOnExit(); @@ -80,9 +78,10 @@ public void test() throws MetaException, CommandNeedRetryException, ois.close(); runsInSlave(cntxt); - commit(map, true, cntxt); + commitInMaster(writerInMaster, true, cntxt); - ReaderContext readCntxt = runsInMaster(map, false); + HCatReader reader = runsInMaster(map, false); + ReaderContext readCntxt = reader.prepareRead(); File readCntxtFile = File.createTempFile("hcat-read", "temp"); readCntxtFile.deleteOnExit(); @@ -98,23 +97,22 @@ public void test() throws MetaException, CommandNeedRetryException, for (int i = 0; i < readCntxt.numSplits(); i++) { runsInSlave(readCntxt, i); } + + reader.close(readCntxt);//called in Master + } - private WriterContext runsInMaster(Map config) throws HCatException { + private HCatWriter runsInMaster(Map config) throws HCatException { WriteEntity.Builder builder = new WriteEntity.Builder(); WriteEntity entity = builder.withTable("mytbl").build(); - HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config); - WriterContext info = writer.prepareWrite(); - return info; + return DataTransferFactory.getHCatWriter(entity, config); } - private ReaderContext runsInMaster(Map config, boolean bogus) + private HCatReader runsInMaster(Map config, boolean bogus) throws HCatException { ReadEntity entity = new ReadEntity.Builder().withTable("mytbl").build(); - HCatReader reader = DataTransferFactory.getHCatReader(entity, config); - ReaderContext cntxt = reader.prepareRead(); - return cntxt; + return DataTransferFactory.getHCatReader(entity, config); } private void runsInSlave(ReaderContext cntxt, int slaveNum) throws HCatException { @@ -141,16 +139,12 @@ private void runsInSlave(WriterContext context) throws HCatException { writer.write(new HCatRecordItr()); } - private void commit(Map config, boolean status, - WriterContext context) throws IOException { - - WriteEntity.Builder builder = new WriteEntity.Builder(); - WriteEntity entity = builder.withTable("mytbl").build(); - HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config); + private void commitInMaster(HCatWriter writerInMaster, boolean status, + WriterContext context) throws IOException { if (status) { - writer.commit(context); + writerInMaster.commit(context); } else { - writer.abort(context); + writerInMaster.abort(context); } } diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatBaseTest.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatBaseTest.java index f0ed92c..7d581cb 100644 --- hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatBaseTest.java +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatBaseTest.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.hcatalog.common.HCatUtil; @@ -60,6 +61,8 @@ public static void setUpTestDataDir() throws Exception { FileUtil.fullyDelete(f); } Assert.assertTrue(new File(TEST_WAREHOUSE_DIR).mkdirs()); + TxnDbUtil.cleanDb(); + TxnDbUtil.prepDb(); } @Before @@ -80,6 +83,8 @@ protected void setUpHiveConf() { hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, ""); hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, ""); hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + //HIVE-6207 + //TxnDbUtil.setConfValues(hiveConf); hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR); } diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java index 9ddc3a6..ab78e42 100644 --- hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java @@ -275,7 +275,7 @@ Job runMRCreate(Map partitionValues, writeRecords = records; MapCreate.writeCount = 0; - Configuration conf = new Configuration(); + Configuration conf = new Configuration(hiveConf); Job job = new Job(conf, "hcat mapreduce write test"); job.setJarByClass(this.getClass()); job.setMapperClass(HCatMapReduceTest.MapCreate.class); @@ -319,8 +319,9 @@ Job runMRCreate(Map partitionValues, // Ensure counters are set when data has actually been read. if (partitionValues != null) { - assertTrue(job.getCounters().getGroup("FileSystemCounters") - .findCounter("FILE_BYTES_READ").getValue() > 0); + long value = job.getCounters().getGroup("FileSystemCounters") + .findCounter("FILE_BYTES_READ").getValue(); + assertTrue("Expected FILE_BYTES_READ > 0; got " + value, value > 0); } if (!HCatUtil.isHadoop23()) { @@ -359,7 +360,7 @@ Job runMRCreate(Map partitionValues, MapRead.readCount = 0; readRecords.clear(); - Configuration conf = new Configuration(); + Configuration conf = new Configuration(hiveConf); Job job = new Job(conf, "hcat mapreduce read test"); job.setJarByClass(this.getClass()); job.setMapperClass(HCatMapReduceTest.MapRead.class); diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java index 049de54..387eea7 100644 --- hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java @@ -201,7 +201,7 @@ private static void initializeSetup() throws Exception { HCatSemanticAnalyzer.class.getName()); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); +// hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " "); System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " "); @@ -306,7 +306,7 @@ public void testOutputFormat() throws Throwable { Path filePath = createInputFile(); FileInputFormat.addInputPath(job, filePath); - Assert.assertTrue(job.waitForCompletion(true)); + Assert.assertTrue("JobID=" + job.getJobID() + " failed.", job.waitForCompletion(true)); ArrayList outputs = new ArrayList(); for (String tbl : tableNames) { diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitionPublish.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitionPublish.java index be7134f..0cd3f28 100644 --- hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitionPublish.java +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitionPublish.java @@ -121,8 +121,7 @@ public static void setup() throws Exception { HCatSemanticAnalyzer.class.getName()); hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, - "false"); +// hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); msc = new HiveMetaStoreClient(hcatConf, null); System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " "); System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " "); diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestPassProperties.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestPassProperties.java index f8a0af1..1552d28 100644 --- hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestPassProperties.java +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestPassProperties.java @@ -61,7 +61,7 @@ public void Initialize() throws Exception { hiveConf = new HiveConf(this.getClass()); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); +// hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); driver = new Driver(hiveConf); SessionState.start(new CliSessionState(hiveConf)); diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/txn/TestHCatDbTxnManager.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/txn/TestHCatDbTxnManager.java new file mode 100644 index 0000000..b9f0653 --- /dev/null +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/txn/TestHCatDbTxnManager.java @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.hcatalog.txn; + +import junit.framework.Assert; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.lockmgr.DbLockManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hive.hcatalog.mapreduce.InputJobInfo; +import org.apache.hive.hcatalog.mapreduce.OutputJobInfo; +import org.apache.hive.hcatalog.mapreduce.PartInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass;import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.lang.reflect.InvocationTargetException;import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays;import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * Tests for {@link org.apache.hive.hcatalog.txn.HCatDbTxnManager} + */ +public class TestHCatDbTxnManager { + static final private Log LOG = LogFactory.getLog(TestHCatDbTxnManager.class.getName()); + + private static HiveStorageHandler mockStorageHandler; + + private HiveConf conf; + private HCatDbTxnManager txnMgr; + + public TestHCatDbTxnManager() throws Exception { + tearDown(); // Just in case there were left overs from a previous run. + } + + @Test + public void testSingleTableRead() throws Exception { + InputJobInfo input = buildInput("mydb", "mytable", null); + txnMgr.acquireLocks(input, null, "fred"); + List locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(1, locks.size()); + Assert.assertEquals(1, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId())); + txnMgr.getLockManager().unlock(locks.get(0)); + locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Test + public void testSinglePartitionRead() throws Exception { + InputJobInfo input = buildInput("mydb", "mytable", + addPartition(null, addPartValue(null, "ds", "today"))); + txnMgr.acquireLocks(input, null, "fred"); + List locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(1, locks.size()); + Assert.assertEquals(1, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId())); + txnMgr.getLockManager().unlock(locks.get(0)); + locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Test + public void testSinglePartitionMultiValueRead() throws Exception { + InputJobInfo input = buildInput("mydb", "mytable", + addPartition(null, addPartValue(addPartValue(null, "region", "us"), "ds", "today"))); + txnMgr.acquireLocks(input, null, "fred"); + List locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(1, locks.size()); + Assert.assertEquals(1, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId())); + txnMgr.getLockManager().unlock(locks.get(0)); + locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Test + public void testMultiPartitionRead() throws Exception { + InputJobInfo input = buildInput("mydb", "mytable", + addPartition(addPartition(null, addPartValue(null, "ds", "yeserday")), + addPartValue(null, "ds", "today"))); + txnMgr.acquireLocks(input, null, "fred"); + List locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(1, locks.size()); + Assert.assertEquals(2, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId())); + txnMgr.getLockManager().unlock(locks.get(0)); + locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Test + public void testUnlockWithNewTxnMgr() throws Exception { + InputJobInfo input = buildInput("mydb", "mytable", + addPartition(null, addPartValue(null, "ds", "today"))); + txnMgr.acquireLocks(input, null, "fred"); + List locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(1, locks.size()); + Assert.assertEquals(1, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).getLockId())); + String jobid = txnMgr.getJobid(); + // Don't overwrite the existing txnMgr, as they may cause it to call finalize and release its + // locks, which we don't want + HCatDbTxnManager newTxnMgr = + (HCatDbTxnManager)TxnManagerFactory.getTxnManagerFactory(true).getTxnManager(conf); + newTxnMgr.setJobid(jobid); + newTxnMgr.getLockManager().unlock(locks.get(0)); + locks = newTxnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Test + public void testShowWithNewTxnMgr() throws Exception { + InputJobInfo input = buildInput("mydb", "mytable", + addPartition(null, addPartValue(null, "ds", "today"))); + txnMgr.acquireLocks(input, null, "fred"); + String jobid = txnMgr.getJobid(); + // Don't overwrite the existing txnMgr, as they may cause it to call finalize and release its + // locks, which we don't want. + HCatDbTxnManager newTxnMgr = + (HCatDbTxnManager)TxnManagerFactory.getTxnManagerFactory(true).getTxnManager(conf); + newTxnMgr.setJobid(jobid); + List locks = newTxnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(1, locks.size()); + Assert.assertEquals(1, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId())); + newTxnMgr.getLockManager().unlock(locks.get(0)); + locks = newTxnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Test + public void testSingleTableWrite() throws Exception { + OutputJobInfo output = OutputJobInfo.create("mydb", "mytable", null); + txnMgr.openTxn("fred"); + txnMgr.acquireLocks(null, output, "fred"); + List locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(1, locks.size()); + Assert.assertEquals(1, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId())); + txnMgr.commitTxn(); + locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Test + public void testSinglePartitionWrite() throws Exception { + OutputJobInfo output = OutputJobInfo.create("mydb", "mytable", + addPartValue(null, "ds", "today")); + txnMgr.openTxn("fred"); + txnMgr.acquireLocks(null, output, "fred"); + List locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(1, locks.size()); + Assert.assertEquals(1, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId())); + txnMgr.commitTxn(); + locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Test + public void testDynamicPartitionWrite() throws Exception { + OutputJobInfo output = OutputJobInfo.create("mydb", "mytable", null); + output.setDynamicPartitioningKeys(Arrays.asList(new String[]{"ds"})); + txnMgr.openTxn("fred"); + txnMgr.acquireLocks(null, output, "fred"); + List locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(1, locks.size()); + Assert.assertEquals(1, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId())); + // Make sure we're locking the whole table, since this is dynamic partitioning + ShowLocksResponse rsp = ((DbLockManager)txnMgr.getLockManager()).getLocks(); + List elms = rsp.getLocks(); + Assert.assertEquals(1, elms.size()); + Assert.assertNotNull(elms.get(0).getTablename()); + Assert.assertNull(elms.get(0).getPartname()); + txnMgr.commitTxn(); + locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Test + public void testRollback() throws Exception { + OutputJobInfo output = OutputJobInfo.create("mydb", "mytable", null); + txnMgr.openTxn("fred"); + txnMgr.acquireLocks(null, output, "fred"); + List locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(1, locks.size()); + Assert.assertEquals(1, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId())); + txnMgr.rollbackTxn(); + locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Test + public void testReadWrite() throws Exception { + InputJobInfo input = buildInput("mydb", "mytable", + addPartition(null, addPartValue(null, "ds", "today"))); + OutputJobInfo output = OutputJobInfo.create("mydb", "yourtable", + addPartValue(null, "ds", "today")); + txnMgr.openTxn("fred"); + txnMgr.acquireLocks(input, output, "fred"); + List locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(1, locks.size()); + Assert.assertEquals(2, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId())); + txnMgr.commitTxn(); + locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Before + public void setup() throws Exception { + conf = new HiveConf(); + TxnDbUtil.setConfValues(conf); + TxnDbUtil.prepDb(); + HiveTxnManager tmpMgr = TxnManagerFactory.getTxnManagerFactory(true).getTxnManager(conf); + Assert.assertTrue(tmpMgr instanceof HCatDbTxnManager); + txnMgr = (HCatDbTxnManager)tmpMgr; + txnMgr.setJobid(UUID.randomUUID().toString()); + } + + @After + public void tearDown() throws Exception { + TxnDbUtil.cleanDb(); + } + + private InputJobInfo buildInput(String dbName, String tableName, + List> parts) throws Exception { + InputJobInfo input = InputJobInfo.create(dbName, tableName, "whatever", null); + if (parts != null) { + List pis = new ArrayList(parts.size()); + for (Map m : parts) { + PartInfo pi = new PartInfo(null, mockStorageHandler, null, null, null, + null); + pi.setPartitionValues(m); + pis.add(pi); + } + input.setPartitionsForTesting(pis); + } + return input; + } + + private Map addPartValue(Map p, String key, String value) { + if (p == null) p = new HashMap(); + p.put(key, value); + return p; + } + + private List> addPartition(List> list, + Map partVals) { + if (list == null) list = new ArrayList>(); + list.add(partVals); + return list; + } + + @BeforeClass + public static void buildMockStorageHandler() { + mockStorageHandler = Mockito.mock(HiveStorageHandler.class); + final InputFormat mif = Mockito.mock(InputFormat.class); + Mockito.when(mockStorageHandler.getInputFormatClass()).thenAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + return mif.getClass(); + } + } + ); + final OutputFormat mof = Mockito.mock(OutputFormat.class); + Mockito.when(mockStorageHandler.getOutputFormatClass()).thenAnswer( + new Answer() { + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + return mof.getClass(); + } + } + ); + final SerDe ms = Mockito.mock(SerDe.class); + Mockito.when(mockStorageHandler.getSerDeClass()).thenAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + return ms.getClass(); + } + } + ); + } +} diff --git hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestE2EScenarios.java hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestE2EScenarios.java index a4b55c8..973607b 100644 --- hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestE2EScenarios.java +++ hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestE2EScenarios.java @@ -64,6 +64,7 @@ private static final String TEXTFILE_LOCN = TEST_DATA_DIR + "/textfile"; private static Driver driver; + private HiveConf hiveConf; protected String storageFormat() { return "orc"; @@ -80,10 +81,10 @@ protected void setUp() throws Exception { throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR); } - HiveConf hiveConf = new HiveConf(this.getClass()); + hiveConf = new HiveConf(this.getClass()); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); +// hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); driver = new Driver(hiveConf); SessionState.start(new CliSessionState(hiveConf)); @@ -148,8 +149,11 @@ private void pigDump(String tableName) throws IOException { private void copyTable(String in, String out) throws IOException, InterruptedException { - Job ijob = new Job(); - Job ojob = new Job(); + //if we don't pass hiveConf in, the HiveConf inside TransactionManger ends up with + //HIVE_SUPPORT_CONCURRENCY=true, but HIVE_TXN_MANAGER=DummyTxnManager and the lock mgr + //is Zookeeper and this hangs... + Job ijob = new Job(hiveConf, "InputJob"); + Job ojob = new Job(hiveConf, "OutputJob"); HCatInputFormat inpy = new HCatInputFormat(); inpy.setInput(ijob , null, in); HCatOutputFormat oupy = new HCatOutputFormat(); diff --git hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java index 82fc8a9..05eab05 100644 --- hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java +++ hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java @@ -145,7 +145,7 @@ public void setup() throws Exception { HiveConf hiveConf = new HiveConf(this.getClass()); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); +// hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); driver = new Driver(hiveConf); SessionState.start(new CliSessionState(hiveConf)); diff --git hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderComplexSchema.java hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderComplexSchema.java index eadbf20..a3e9cc4 100644 --- hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderComplexSchema.java +++ hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderComplexSchema.java @@ -90,7 +90,7 @@ public static void setUpBeforeClass() throws Exception { HiveConf hiveConf = new HiveConf(TestHCatLoaderComplexSchema.class); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); +// hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); driver = new Driver(hiveConf); SessionState.start(new CliSessionState(hiveConf)); //props = new Properties(); diff --git hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerMulti.java hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerMulti.java index 76080f7..b35b04f 100644 --- hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerMulti.java +++ hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerMulti.java @@ -83,7 +83,7 @@ protected void setUp() throws Exception { HiveConf hiveConf = new HiveConf(this.getClass()); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); +// hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); driver = new Driver(hiveConf); SessionState.start(new CliSessionState(hiveConf)); diff --git hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java index 04029ed..6dcedfd 100644 --- hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java +++ hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java @@ -99,8 +99,7 @@ public static void startMetaStoreServer() throws Exception { HCatSemanticAnalyzer.class.getName()); hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, - "false"); +// hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " "); System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " "); } diff --git itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/hbase/ManyMiniCluster.java itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/hbase/ManyMiniCluster.java index 745aa99..c26fa13 100644 --- itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/hbase/ManyMiniCluster.java +++ itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/hbase/ManyMiniCluster.java @@ -280,7 +280,7 @@ private void setUpMetastore() throws Exception { //is present only in the ql/test directory hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); +// hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); hiveConf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, "jdbc:derby:" + new File(workDir + "/metastore_db") + ";create=true"); hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.toString(), diff --git itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/hbase/TestPigHBaseStorageHandler.java itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/hbase/TestPigHBaseStorageHandler.java index f907380..65fb93d 100644 --- itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/hbase/TestPigHBaseStorageHandler.java +++ itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/hbase/TestPigHBaseStorageHandler.java @@ -74,7 +74,7 @@ public void Initialize() throws Exception { URI fsuri = getFileSystem().getUri(); Path whPath = new Path(fsuri.getScheme(), fsuri.getAuthority(), getTestDir()); - hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); +// hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); hcatConf.set(ConfVars.METASTOREWAREHOUSE.varname, whPath.toString()); diff --git itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/mapreduce/TestSequenceFileReadWrite.java itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/mapreduce/TestSequenceFileReadWrite.java index f4c3c20..bab23f1 100644 --- itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/mapreduce/TestSequenceFileReadWrite.java +++ itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/mapreduce/TestSequenceFileReadWrite.java @@ -74,7 +74,7 @@ public void setup() throws Exception { hiveConf = new HiveConf(this.getClass()); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); +// hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehouseDir); driver = new Driver(hiveConf); SessionState.start(new CliSessionState(hiveConf)); diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 401e639..25f4f0b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -841,7 +841,7 @@ private int recordValidTxns() { /** * Acquire read and write locks needed by the statement. The list of objects to be locked are - * obtained from he inputs and outputs populated by the compiler. The lock acuisition scheme is + * obtained from the inputs and outputs populated by the compiler. The lock acquisition scheme is * pretty simple. If all the locks cannot be obtained, error out. Deadlock is avoided by making * sure that the locks are lexicographically sorted. **/ @@ -961,14 +961,6 @@ private int compileInternal(String command) { synchronized (compileMonitor) { ret = compile(command); } - if (ret != 0) { - try { - releaseLocks(ctx.getHiveLocks()); - } catch (LockException e) { - LOG.warn("Exception in releasing locks. " - + org.apache.hadoop.util.StringUtils.stringifyException(e)); - } - } return ret; } @@ -1634,5 +1626,7 @@ public void destroy() { public String getErrorMsg() { return errorMessage; } - + public Context getContext() { + return ctx; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java index 7fdb4e7..07c8463 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java @@ -35,17 +35,23 @@ private boolean dontHeartbeat = false; private HiveTxnManager txnMgr; private Configuration conf; + private final String id; - static final private Log LOG = LogFactory.getLog(Heartbeater.class.getName()); + static final private Log LOG = LogFactory.getLog(Heartbeater.class); /** * * @param txnMgr transaction manager for this operation * @param conf Configuration for this operation + * @param id if not null, used to tag log messages */ - public Heartbeater(HiveTxnManager txnMgr, Configuration conf) { + public Heartbeater(HiveTxnManager txnMgr, Configuration conf, String id) { this.txnMgr = txnMgr; this.conf = conf; + this.id = id; + } + public Heartbeater(HiveTxnManager txnMgr, Configuration conf) { + this(txnMgr, conf, null); } /** @@ -56,7 +62,7 @@ public void heartbeat() throws IOException { if (dontHeartbeat) return; if (txnMgr == null) { - LOG.debug("txnMgr null, not heartbeating"); + LOG.debug("txnMgr null, not heartbeating for " + id); dontHeartbeat = true; return; } @@ -66,19 +72,19 @@ public void heartbeat() throws IOException { // but divide by 2 to give us a safety factor. heartbeatInterval = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT) * 500; if (heartbeatInterval == 0) { - LOG.warn(HiveConf.ConfVars.HIVE_TXN_MANAGER.toString() + " not set, heartbeats won't be sent"); + LOG.warn(HiveConf.ConfVars.HIVE_TXN_MANAGER.toString() + " not set, heartbeats won't be sent for " + id); dontHeartbeat = true; - LOG.debug("heartbeat interval 0, not heartbeating"); + LOG.debug("heartbeat interval 0, not heartbeating for " + id); return; } } long now = System.currentTimeMillis(); if (now - lastHeartbeat > heartbeatInterval) { try { - LOG.debug("heartbeating"); + LOG.debug("heartbeating for" + id); txnMgr.heartbeat(); } catch (LockException e) { - LOG.warn("Failed trying to heartbeat " + e.getMessage()); + LOG.warn("Failed trying to heartbeat for " + id + ": " + e.getMessage()); throw new IOException(e); } lastHeartbeat = now; diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index 535912f..d509a16 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -40,9 +40,9 @@ static final private Log LOG = LogFactory.getLog(CLASS_NAME); private static final long MAX_SLEEP = 15000; + protected Set locks; + protected HiveMetaStoreClient client; private HiveLockManagerCtx context; - private Set locks; - private HiveMetaStoreClient client; private long nextSleep = 50; DbLockManager(HiveMetaStoreClient client) { @@ -50,6 +50,10 @@ this.client = client; } + protected DbLockManager(HiveMetaStoreClient client, boolean fromHcat) { + this(client); + } + @Override public void setContext(HiveLockManagerCtx ctx) throws LockException { context = ctx; @@ -73,7 +77,7 @@ public HiveLock lock(HiveLockObject key, HiveLockMode mode, * @param lock lock request * @throws LockException */ - List lock(LockRequest lock) throws LockException { + public List lock(LockRequest lock) throws LockException { try { LOG.debug("Requesting lock"); LockResponse res = client.lock(lock); @@ -174,14 +178,18 @@ public void refresh() { // NOP } - static class DbHiveLock extends HiveLock { + public static class DbHiveLock extends HiveLock { long lockId; - DbHiveLock(long id) { + public DbHiveLock(long id) { lockId = id; } + public long getLockId() { + return lockId; + } + @Override public HiveLockObject getHiveLockObject() { throw new UnsupportedOperationException(); @@ -205,6 +213,19 @@ public boolean equals(Object other) { public int hashCode() { return (int)(lockId % Integer.MAX_VALUE); } + public String toString() { + return "lockId=" + lockId; + } + } + + /** + * Clear the memory of the locks in this object. This won't clear the locks from the database. + * It is for use with + * {@link #DbLockManager(org.apache.hadoop.hive.metastore.HiveMetaStoreClient).commitTxn} and + * {@link #DbLockManager(org.apache.hadoop.hive.metastore.HiveMetaStoreClient).rollbackTxn}. + */ + void clearLocalLockRecords() { + locks.clear(); } // Sleep before we send checkLock again, but do it with a back off diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index f2f416e..dcfea3b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -21,7 +21,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.LockComponentBuilder; import org.apache.hadoop.hive.metastore.LockRequestBuilder; import org.apache.hadoop.hive.metastore.api.*; @@ -45,13 +44,23 @@ static final private String CLASS_NAME = DbTxnManager.class.getName(); static final private Log LOG = LogFactory.getLog(CLASS_NAME); - private DbLockManager lockMgr = null; - private HiveMetaStoreClient client = null; - private long txnId = 0; + protected DbLockManager lockMgr = null; + protected HiveMetaStoreClient client = null; + protected long txnId = 0; + /** + * Do not instantiate directly, use {@link org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory}. + */ DbTxnManager() { } + /** + * A constructor for extending classes. + * @param fromHCat + */ + protected DbTxnManager(boolean fromHCat) { + } + @Override public void openTxn(String user) throws LockException { init(); @@ -68,7 +77,7 @@ public void openTxn(String user) throws LockException { public HiveLockManager getLockManager() throws LockException { init(); if (lockMgr == null) { - lockMgr = new DbLockManager(client); + lockMgr = instantiateLockMgr(client); } return lockMgr; } @@ -170,6 +179,8 @@ public void acquireLocks(QueryPlan plan, Context ctx, String username) throws Lo compBuilder.setTableName(t.getTableName()); break; + // TODO, I don't think this is right. For dynamic partitioning case (DUMMYPARTITION) we + // should be setting the lock on the whole table I think. case PARTITION: case DUMMYPARTITION: compBuilder.setPartitionName(output.getPartition().getName()); @@ -200,9 +211,10 @@ public void acquireLocks(QueryPlan plan, Context ctx, String username) throws Lo public void commitTxn() throws LockException { if (txnId == 0) { throw new RuntimeException("Attempt to commit before opening a " + - "transaction"); + "transaction." + addWaterMark()); } try { + lockMgr.clearLocalLockRecords(); LOG.debug("Committing txn " + txnId); client.commitTxn(txnId); } catch (NoSuchTxnException e) { @@ -219,13 +231,26 @@ public void commitTxn() throws LockException { } } + protected String addWaterMark() throws LockException{ + StringBuilder sb = new StringBuilder("thId=").append(Thread.currentThread().getId()); + sb.append(" txnId=").append(txnId); + if(lockMgr != null) { + List locks = lockMgr.getLocks(false, false); + sb.append(" locks: "); + for(HiveLock hl : locks) { + sb.append(hl); + } + } + return sb.toString(); + } @Override public void rollbackTxn() throws LockException { if (txnId == 0) { throw new RuntimeException("Attempt to rollback before opening a " + - "transaction"); + "transaction. " + addWaterMark()); } try { + lockMgr.clearLocalLockRecords(); LOG.debug("Rolling back txn " + txnId); client.rollbackTxn(txnId); } catch (NoSuchTxnException e) { @@ -304,7 +329,11 @@ protected void destruct() { } } - private void init() throws LockException { + protected DbLockManager instantiateLockMgr(HiveMetaStoreClient client) { + return new DbLockManager(client); + } + + protected void init() throws LockException { if (client == null) { if (conf == null) { throw new RuntimeException("Must call setHiveConf before any other " + diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/TxnManagerFactory.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/TxnManagerFactory.java index 4d616d0..24087d7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/TxnManagerFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/TxnManagerFactory.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.lockmgr; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.util.ReflectionUtils; @@ -32,17 +34,34 @@ */ public class TxnManagerFactory { + static final private Log LOG = LogFactory.getLog(TxnManagerFactory.class.getName()); + private static TxnManagerFactory self; + private static TxnManagerFactory hcatSelf; + private boolean inHcat; /** * Get the singleton instance of this factory. + * @param hcat should be set to true if this is an Hcatalog process rather than a Hive client. * @return this factory */ - public static synchronized TxnManagerFactory getTxnManagerFactory() { - if (self == null) { - self = new TxnManagerFactory(); + public static synchronized TxnManagerFactory getTxnManagerFactory(boolean hcat) { + if(hcat) { + if(hcatSelf == null) { + hcatSelf = new TxnManagerFactory(true); + } + return hcatSelf; + } + else { + if (self == null) { + self = new TxnManagerFactory(false); + } + return self; } - return self; + } + + public static TxnManagerFactory getTxnManagerFactory() { + return getTxnManagerFactory(false); } /** @@ -64,6 +83,11 @@ public HiveTxnManager getTxnManager(HiveConf conf) throws if (txnMgrName == null || txnMgrName.isEmpty()) { throw new LockException(ErrorMsg.TXNMGR_NOT_SPECIFIED.getMsg()); } + if (inHcat && txnMgrName.equals(DbTxnManager.class.getName())) { + // This is unfortunate, but I don't see a way around it. I could create an HCatalog + // specific TxnManagerFactory, but I'd end up duplicating a lot of logic. + txnMgrName = "org.apache.hive.hcatalog.txn.HCatDbTxnManager"; + } // Instantiate the chosen transaction manager try { @@ -72,11 +96,13 @@ public HiveTxnManager getTxnManager(HiveConf conf) throws impl.setHiveConf(conf); txnMgr = impl; } catch (ClassNotFoundException e) { + LOG.error("Unable to instantiate " + txnMgrName + ", " + e.getMessage()); throw new LockException(ErrorMsg.TXNMGR_NOT_INSTANTIATED.getMsg()); } return txnMgr; } - private TxnManagerFactory() { + private TxnManagerFactory(boolean hcat) { + inHcat = hcat; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java index 2eb4438..8153dda 100644 --- ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java +++ ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java @@ -51,4 +51,12 @@ public CommandProcessorResponse(int responseCode, String errorMessage, String SQ public String getErrorMessage() { return errorMessage; } public String getSQLState() { return SQLState; } public Schema getSchema() { return resSchema; } + public String toString() { + StringBuilder sb = new StringBuilder("responseCode=").append(responseCode); + if(responseCode != 0) { + sb.append(" errorMessage=").append(errorMessage); + sb.append(" SQLState=").append(SQLState); + } + return sb.toString(); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java index 98c3cc3..3e0e1dd 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java @@ -124,12 +124,13 @@ public void testJoin() throws Exception { public void testSingleWriteTable() throws Exception { WriteEntity we = addTableOutput(WriteEntity.WriteType.INSERT); QueryPlan qp = new MockQueryPlan(this); + txnMgr.openTxn("fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); Assert.assertEquals(1, TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId)); - txnMgr.getLockManager().unlock(locks.get(0)); + txnMgr.commitTxn(); locks = txnMgr.getLockManager().getLocks(false, false); Assert.assertEquals(0, locks.size()); } @@ -142,12 +143,13 @@ public void testReadWrite() throws Exception { addPartitionInput(t); WriteEntity we = addTableOutput(WriteEntity.WriteType.INSERT); QueryPlan qp = new MockQueryPlan(this); + txnMgr.openTxn("fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); Assert.assertEquals(4, TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId)); - txnMgr.getLockManager().unlock(locks.get(0)); + txnMgr.commitTxn(); locks = txnMgr.getLockManager().getLocks(false, false); Assert.assertEquals(0, locks.size()); } @@ -156,12 +158,13 @@ public void testReadWrite() throws Exception { public void testUpdate() throws Exception { WriteEntity we = addTableOutput(WriteEntity.WriteType.UPDATE); QueryPlan qp = new MockQueryPlan(this); + txnMgr.openTxn("fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); Assert.assertEquals(1, TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId)); - txnMgr.getLockManager().unlock(locks.get(0)); + txnMgr.commitTxn(); locks = txnMgr.getLockManager().getLocks(false, false); Assert.assertEquals(0, locks.size()); } @@ -170,12 +173,28 @@ public void testUpdate() throws Exception { public void testDelete() throws Exception { WriteEntity we = addTableOutput(WriteEntity.WriteType.DELETE); QueryPlan qp = new MockQueryPlan(this); + txnMgr.openTxn("fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); Assert.assertEquals(1, TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId)); - txnMgr.getLockManager().unlock(locks.get(0)); + txnMgr.commitTxn(); + locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Test + public void testRollback() throws Exception { + WriteEntity we = addTableOutput(WriteEntity.WriteType.DELETE); + QueryPlan qp = new MockQueryPlan(this); + txnMgr.openTxn("fred"); + txnMgr.acquireLocks(qp, ctx, "fred"); + List locks = ctx.getHiveLocks(); + Assert.assertEquals(1, locks.size()); + Assert.assertEquals(1, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId)); + txnMgr.rollbackTxn(); locks = txnMgr.getLockManager().getLocks(false, false); Assert.assertEquals(0, locks.size()); }