diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/DataTransferFactory.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/DataTransferFactory.java index a415ac6..6276a55 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/DataTransferFactory.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/DataTransferFactory.java @@ -21,8 +21,6 @@ import java.util.Map; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hive.hcatalog.data.transfer.impl.HCatInputFormatReader; import org.apache.hive.hcatalog.data.transfer.impl.HCatOutputFormatWriter; import org.apache.hive.hcatalog.data.transfer.state.DefaultStateProvider; @@ -56,16 +54,16 @@ public static HCatReader getHCatReader(final ReadEntity re, * This should only be called once from every slave node to obtain an instance * of {@link HCatReader}. * - * @param split - * input split obtained at master node - * @param config - * configuration obtained at master node + * @param context + * reader context obtained at the master node + * @param slaveNumber + * which slave this is, determines which part of the read is done * @return {@link HCatReader} */ - public static HCatReader getHCatReader(final InputSplit split, - final Configuration config) { + public static HCatReader getHCatReader(final ReaderContext context, + int slaveNumber) { // In future, this may examine config to return appropriate HCatReader - return getHCatReader(split, config, DefaultStateProvider.get()); + return getHCatReader(context, slaveNumber, DefaultStateProvider.get()); } /** @@ -73,18 +71,19 @@ public static HCatReader getHCatReader(final InputSplit split, * of {@link HCatReader}. This should be called if an external system has some * state to provide to HCatalog. * - * @param split - * input split obtained at master node - * @param config - * configuration obtained at master node + * @param context + * reader context obtained at the master node + * @param slaveNumber + * which slave this is, determines which part of the read is done * @param sp * {@link StateProvider} * @return {@link HCatReader} */ - public static HCatReader getHCatReader(final InputSplit split, - final Configuration config, StateProvider sp) { + public static HCatReader getHCatReader(final ReaderContext context, + int slaveNumber, + StateProvider sp) { // In future, this may examine config to return appropriate HCatReader - return new HCatInputFormatReader(split, config, sp); + return new HCatInputFormatReader(context, slaveNumber, sp); } /** @@ -131,6 +130,6 @@ public static HCatWriter getHCatWriter(final WriterContext cntxt) { public static HCatWriter getHCatWriter(final WriterContext cntxt, final StateProvider sp) { // In future, this may examine context to return appropriate HCatWriter - return new HCatOutputFormatWriter(cntxt.getConf(), sp); + return new HCatOutputFormatWriter(cntxt, sp); } } diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatReader.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatReader.java index e4cf97c..f41336b 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatReader.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatReader.java @@ -95,11 +95,4 @@ private HCatReader(final Map config) { this.conf = conf; } - public Configuration getConf() { - if (null == conf) { - throw new IllegalStateException( - "HCatReader is not constructed correctly."); - } - return conf; - } } diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/ReaderContext.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/ReaderContext.java index 5abe69c..edf3654 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/ReaderContext.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/ReaderContext.java @@ -20,70 +20,22 @@ package org.apache.hive.hcatalog.data.transfer; import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.ArrayList; -import java.util.List; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hive.hcatalog.mapreduce.HCatSplit; /** - * This class will contain information of different {@link InputSplit} obtained - * at master node and configuration. This class implements - * {@link Externalizable} so it can be serialized using standard java - * mechanisms. + * This read context is obtained by the master node and should be distributed + * to the slaves. The contents of the class are opaque to the client. This + * interface extends {@link java.io.Externalizable} so that implementing + * classes can be serialized using standard Java mechanisms. */ -public class ReaderContext implements Externalizable, Configurable { +public interface ReaderContext extends Externalizable { - private static final long serialVersionUID = -2656468331739574367L; - private List splits; - private Configuration conf; + /** + * Determine the number of splits available in this {@link ReaderContext}. + * The client is not required to have this many slave nodes, + * as one slave can be used to read multiple splits. + * @return number of splits + */ + public int numSplits(); - public ReaderContext() { - this.splits = new ArrayList(); - this.conf = new Configuration(); - } - - public void setInputSplits(final List splits) { - this.splits = splits; - } - - public List getSplits() { - return splits; - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(final Configuration config) { - conf = config; - } - - @Override - public void writeExternal(ObjectOutput out) throws IOException { - conf.write(out); - out.writeInt(splits.size()); - for (InputSplit split : splits) { - ((HCatSplit) split).write(out); - } - } - - @Override - public void readExternal(ObjectInput in) throws IOException, - ClassNotFoundException { - conf.readFields(in); - int numOfSplits = in.readInt(); - for (int i = 0; i < numOfSplits; i++) { - HCatSplit split = new HCatSplit(); - split.readFields(in); - splits.add(split); - } - } } diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/WriterContext.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/WriterContext.java index 5e2135c..bd47c35 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/WriterContext.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/WriterContext.java @@ -20,46 +20,14 @@ package org.apache.hive.hcatalog.data.transfer; import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; - -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; /** - * This contains information obtained at master node to help prepare slave nodes - * for writer. This class implements {@link Externalizable} so it can be - * serialized using standard java mechanisms. Master should serialize it and + * This contains information obtained at master node to be distributed to + * slaves nodes that will do the writing. + * This class implements {@link Externalizable} so it can be + * serialized using standard Java mechanisms. Master should serialize it and * make it available to slaves to prepare for writes. */ -public class WriterContext implements Externalizable, Configurable { - - private static final long serialVersionUID = -5899374262971611840L; - private Configuration conf; - - public WriterContext() { - conf = new Configuration(); - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(final Configuration config) { - this.conf = config; - } - - @Override - public void writeExternal(ObjectOutput out) throws IOException { - conf.write(out); - } +public interface WriterContext extends Externalizable { - @Override - public void readExternal(ObjectInput in) throws IOException, - ClassNotFoundException { - conf.readFields(in); - } } diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java index 9e7a548..8669ded 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java @@ -48,10 +48,10 @@ private InputSplit split; - public HCatInputFormatReader(InputSplit split, Configuration config, - StateProvider sp) { - super(config, sp); - this.split = split; + public HCatInputFormatReader(ReaderContext context, int slaveNumber, + StateProvider sp) { + super(((ReaderContextImpl)context).getConf(), sp); + this.split = ((ReaderContextImpl)context).getSplits().get(slaveNumber); } public HCatInputFormatReader(ReadEntity info, Map config) { @@ -64,7 +64,7 @@ public ReaderContext prepareRead() throws HCatException { Job job = new Job(conf); HCatInputFormat hcif = HCatInputFormat.setInput( job, re.getDbName(), re.getTableName(), re.getFilterString()); - ReaderContext cntxt = new ReaderContext(); + ReaderContextImpl cntxt = new ReaderContextImpl(); cntxt.setInputSplits(hcif.getSplits( ShimLoader.getHadoopShims().getHCatShim().createJobContext(job.getConfiguration(), null))); cntxt.setConf(job.getConfiguration()); diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java index 17d4e38..8d93611 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java @@ -52,8 +52,8 @@ public HCatOutputFormatWriter(WriteEntity we, Map config) { super(we, config); } - public HCatOutputFormatWriter(Configuration config, StateProvider sp) { - super(config, sp); + public HCatOutputFormatWriter(WriterContext cntxt, StateProvider sp) { + super(((WriterContextImpl)cntxt).getConf(), sp); } @Override @@ -74,7 +74,7 @@ public WriterContext prepareWrite() throws HCatException { } catch (InterruptedException e) { throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); } - WriterContext cntxt = new WriterContext(); + WriterContextImpl cntxt = new WriterContextImpl(); cntxt.setConf(job.getConfiguration()); return cntxt; } @@ -124,10 +124,14 @@ public void write(Iterator recordItr) throws HCatException { @Override public void commit(WriterContext context) throws HCatException { + WriterContextImpl cntxtImpl = (WriterContextImpl)context; try { - new HCatOutputFormat().getOutputCommitter(ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext( - context.getConf(), ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptID())) - .commitJob(ShimLoader.getHadoopShims().getHCatShim().createJobContext(context.getConf(), null)); + new HCatOutputFormat().getOutputCommitter( + ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext( + cntxtImpl.getConf(), + ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptID())) + .commitJob(ShimLoader.getHadoopShims().getHCatShim().createJobContext( + cntxtImpl.getConf(), null)); } catch (IOException e) { throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); } catch (InterruptedException e) { @@ -137,11 +141,14 @@ public void commit(WriterContext context) throws HCatException { @Override public void abort(WriterContext context) throws HCatException { + WriterContextImpl cntxtImpl = (WriterContextImpl)context; try { - new HCatOutputFormat().getOutputCommitter(ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext( - context.getConf(), ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptID())) - .abortJob(ShimLoader.getHadoopShims().getHCatShim().createJobContext( - context.getConf(), null), State.FAILED); + new HCatOutputFormat().getOutputCommitter( + ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext( + cntxtImpl.getConf(), + ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptID())) + .abortJob(ShimLoader.getHadoopShims().getHCatShim().createJobContext( + cntxtImpl.getConf(), null), State.FAILED); } catch (IOException e) { throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); } catch (InterruptedException e) { diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/ReaderContextImpl.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/ReaderContextImpl.java new file mode 100644 index 0000000..89628a4 --- /dev/null +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/ReaderContextImpl.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.data.transfer.impl; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hive.hcatalog.data.transfer.ReaderContext; +import org.apache.hive.hcatalog.mapreduce.HCatSplit; + +/** + * This class contains the list of {@link InputSplit}s obtained + * at master node and the configuration. + */ +class ReaderContextImpl implements ReaderContext, Configurable { + + private static final long serialVersionUID = -2656468331739574367L; + private List splits; + private Configuration conf; + + public ReaderContextImpl() { + this.splits = new ArrayList(); + this.conf = new Configuration(); + } + + void setInputSplits(final List splits) { + this.splits = splits; + } + + List getSplits() { + return splits; + } + + @Override + public int numSplits() { + return splits.size(); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(final Configuration config) { + conf = config; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + conf.write(out); + out.writeInt(splits.size()); + for (InputSplit split : splits) { + ((HCatSplit) split).write(out); + } + } + + @Override + public void readExternal(ObjectInput in) throws IOException, + ClassNotFoundException { + conf.readFields(in); + int numOfSplits = in.readInt(); + for (int i = 0; i < numOfSplits; i++) { + HCatSplit split = new HCatSplit(); + split.readFields(in); + splits.add(split); + } + } +} diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/WriterContextImpl.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/WriterContextImpl.java new file mode 100644 index 0000000..088d258 --- /dev/null +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/WriterContextImpl.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.data.transfer.impl; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hive.hcatalog.data.transfer.WriterContext; + +/** + * This contains information obtained at master node to help prepare slave nodes + * for writer. This class implements {@link Externalizable} so it can be + * serialized using standard java mechanisms. Master should serialize it and + * make it available to slaves to prepare for writes. + */ +class WriterContextImpl implements WriterContext, Configurable { + + private static final long serialVersionUID = -5899374262971611840L; + private Configuration conf; + + public WriterContextImpl() { + conf = new Configuration(); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(final Configuration config) { + this.conf = config; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + conf.write(out); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, + ClassNotFoundException { + conf.readFields(in); + } +} diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestReaderWriter.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestReaderWriter.java index f1e893e..4f92b68 100644 --- hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestReaderWriter.java +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestReaderWriter.java @@ -95,8 +95,8 @@ public void test() throws MetaException, CommandNeedRetryException, readCntxt = (ReaderContext) ois.readObject(); ois.close(); - for (InputSplit split : readCntxt.getSplits()) { - runsInSlave(split, readCntxt.getConf()); + for (int i = 0; i < readCntxt.numSplits(); i++) { + runsInSlave(readCntxt, i); } } @@ -117,9 +117,9 @@ private ReaderContext runsInMaster(Map config, boolean bogus) return cntxt; } - private void runsInSlave(InputSplit split, Configuration config) throws HCatException { + private void runsInSlave(ReaderContext cntxt, int slaveNum) throws HCatException { - HCatReader reader = DataTransferFactory.getHCatReader(split, config); + HCatReader reader = DataTransferFactory.getHCatReader(cntxt, slaveNum); Iterator itr = reader.read(); int i = 1; while (itr.hasNext()) {