diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/EnforceReadOnlyTables.java ql/src/java/org/apache/hadoop/hive/ql/hooks/EnforceReadOnlyTables.java index 1458075..8dfe24f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/hooks/EnforceReadOnlyTables.java +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/EnforceReadOnlyTables.java @@ -56,8 +56,7 @@ public void run(HookContext hookContext) throws Exception { this.run(ss,inputs,outputs,ugi); } - public void run(SessionState sess, Set inputs, - Set outputs, UserGroupInformation ugi) + public void run(SessionState sess, Set inputs, Set outputs, UserGroupInformation ugi) throws Exception { if (sess.getConf().getBoolean("hive.test.init.phase", false) == true) { return; diff --git streaming/pom.xml streaming/pom.xml index 96e134c..eae34b4 100644 --- streaming/pom.xml +++ streaming/pom.xml @@ -44,6 +44,27 @@ hive-metastore ${project.version} + + org.apache.hive + hive-metastore + ${project.version} + + + org.apache.hive + hive-exec + ${project.version} + + + org.apache.hive + hive-cli + ${project.version} + + + + org.apache.hadoop + hadoop-core + ${hadoop-20S.version} + @@ -52,6 +73,7 @@ ${junit.version} test + diff --git streaming/src/java/org/apache/hive/streaming/HiveStreamingEndPoint.java streaming/src/java/org/apache/hive/streaming/HiveStreamingEndPoint.java index ee55b3a..22f9102 100644 --- streaming/src/java/org/apache/hive/streaming/HiveStreamingEndPoint.java +++ streaming/src/java/org/apache/hive/streaming/HiveStreamingEndPoint.java @@ -18,22 +18,322 @@ package org.apache.hive.streaming; -import java.util.HashMap; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.LockComponentBuilder; +import org.apache.hadoop.hive.metastore.LockRequestBuilder; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.thrift.TException; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Properties; public class HiveStreamingEndPoint { public final String nameNodeUri; public final String metaStoreUri; public final String database; public final String table; - public final HashMap partitionSpec; + public final List partitionVals; public HiveStreamingEndPoint(String nameNodeUri, String metaStoreUri - , String database, String table, HashMap partitionSpec) { + , String database, String table, List partitionVals) { this.nameNodeUri = nameNodeUri; this.metaStoreUri = metaStoreUri; this.database = database; this.table = table; - this.partitionSpec = partitionSpec; + this.partitionVals = partitionVals; + } + + public StreamingConnection newConnection(String serdeClassName, boolean useEmbeddedMetastore) + throws ConnectionError, InvalidPartition, ClassNotFoundException { + return new Connection(this, useEmbeddedMetastore, serdeClassName); } -} + private static class Connection implements StreamingConnection { + private final HiveConf conf; + private final IMetaStoreClient msClient; + private final HiveStreamingEndPoint hiveEndPoint; + private final LockRequest lockRequest; + private final Class serdeClass; + + private Connection(HiveStreamingEndPoint hiveEndPoint, boolean embeddedMetaStore + , String serdeClassName) + throws ConnectionError, InvalidPartition, ClassNotFoundException { + conf = new HiveConf(); + this.msClient = getMetaStoreClient(embeddedMetaStore); + this.hiveEndPoint = hiveEndPoint; + this.lockRequest = createLockRequest(hiveEndPoint); + serdeClass = Class.forName(serdeClassName); + } + + public void close() { + msClient.close(); + } + + public TransactionBatch fetchTransactionBatch(int numTransactions) + throws ConnectionError, InvalidPartition, StreamingException { + return new StreamingTransactionBatch(numTransactions, msClient, hiveEndPoint, lockRequest, serdeClass, conf); + } + + // helper methods + private IMetaStoreClient getMetaStoreClient(boolean embeddedMetastore) throws ConnectionError { + if(embeddedMetastore) { + TxnDbUtil.setConfValues(conf); + } else { + conf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveEndPoint.metaStoreUri); + } + + try { + return Hive.get(conf).getMSC(); + } catch (MetaException e) { + throw new ConnectionError("Error connecting to Hive Metastore URI: " + + hiveEndPoint.metaStoreUri, e); + } catch (HiveException e) { + throw new ConnectionError("Error connecting to Hive Metastore URI: " + + hiveEndPoint.metaStoreUri, e); + } + } + + private static LockRequest createLockRequest(HiveStreamingEndPoint hiveEndPoint) + throws InvalidPartition { + LockRequestBuilder rqstBuilder = new LockRequestBuilder(); + + for( String partition : hiveEndPoint.partitionVals ) { + rqstBuilder.addLockComponent(new LockComponentBuilder() + .setDbName(hiveEndPoint.database) + .setTableName(hiveEndPoint.table) + .setPartitionName(partition) + .setShared() + .build()); + } + return rqstBuilder.build(); + } + } // class Connection + +// private static List makePartitionSpecList ( +// Map partitionSpec) throws InvalidPartition { +// List result = new ArrayList(partitionSpec.size()); +// for( String partitionName : partitionSpec.keySet() ) { +// String partitionValue = partitionSpec.get(partitionName); +// Map spec = new HashMap(1); +// spec.put(partitionName,partitionValue); +// try { +// result.add(Warehouse.makePartName(spec, false)); +// } catch (MetaException e) { +// throw new InvalidPartition(partitionName,partitionValue); +// } +// } +// return result; +// } + + private static class StreamingTransactionBatch implements TransactionBatch { + private final List txnIds; + private int currentTxnIndex; + private final IMetaStoreClient msClient; + private final AbstractSerDe serde; + private final ObjectInspector inspector; + + private TxnState state; + private final LockRequest lockRequest; + private final Path path; + private final OrcOutputFormat outf = new OrcOutputFormat(); + private final RecordUpdater updater; + + private StreamingTransactionBatch(int numTxns, IMetaStoreClient msClient, + HiveStreamingEndPoint hiveEndPoint, LockRequest lockRequest, + java.lang.Class serdeClass, HiveConf conf) + throws ConnectionError, InvalidPartition, StreamingIOFailure, StreamingException { + try { + this.msClient = msClient; + this.lockRequest = lockRequest; + txnIds = msClient.openTxns(numTxns).getTxn_ids(); + state = TxnState.INACTIVE; + + currentTxnIndex = -1; + state = TxnState.INACTIVE; + + path = getPathForPartition(hiveEndPoint.database, hiveEndPoint.table, + hiveEndPoint.partitionVals); + + + serde = initSerde(serdeClass, conf, msClient, hiveEndPoint.database + , hiveEndPoint.table); + inspector = serde.getObjectInspector(); + + + org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration(); + + + updater = outf.getRecordUpdater(path, + new AcidOutputFormat.Options(config) + .inspector(inspector) + .bucket(0) + .minimumTransactionId(100) + .maximumTransactionId(200) + .useDummy(System.out) + ); + + } catch (TException e) { + throw new ConnectionError("Unable to fetch new transaction batch", e); + } catch (SerDeException e) { + throw new StreamingException("Unable to acquire object inspector from serde", e); + } catch (IOException e) { + throw new StreamingIOFailure("Unable to get Record Updater", e); + } + } + + private static AbstractSerDe initSerde(Class serdeClass, HiveConf conf, IMetaStoreClient msClient, String database, String table) throws ConnectionError, InvalidTable, SerializationError { + try { + Hive meta = Hive.get(conf,false); + Table tbl = meta.getTable(database,table); + Properties tblProps = MetaStoreUtils.getTableMetadata(tbl.getTTable()); + + // create the serde and initialize it + AbstractSerDe serde = (AbstractSerDe) + ReflectionUtils.newInstance(serdeClass, conf); + serde.initialize(conf, tblProps); + return serde; + + } catch (HiveException e) { + throw new ConnectionError("Unable to connect to hive", e); + } catch (SerDeException e) { + throw new SerializationError("Error initializing serde",e); + } + } + + private Path getPathForPartition(String database, String table, + List partitionVals) + throws InvalidPartition, StreamingException { + try { + String location = msClient.getPartition(database,table,partitionVals).getSd().getLocation(); + return new Path(location); + } catch (TException e) { + throw new StreamingException(e.getMessage() + ". Unable to get path for specified partition: " + partitionVals, e); + } + } + + public void beginNextTransaction() throws StreamingException { + if(currentTxnIndex >= txnIds.size()) + throw new InvalidTrasactionState("No more transactions available in" + + " current batch"); + ++currentTxnIndex; + + try { + LockResponse res = msClient.lock(lockRequest); + if(res.getState() != LockState.ACQUIRED) { + throw new StreamingException("Unable to acquire partition lock"); + } + } catch (TException e) { + throw new StreamingException("Unable to acquire partition lock", e); + } + + state = TxnState.OPEN; + } + + private Long getCurrentTxnId() { + return txnIds.get(currentTxnIndex); + } + + public TxnState getCurrentTransactionState() { + return state; + } + + // active txn is not considered part of remaining txn + public int remainingTransactions() { + return txnIds.size() - currentTxnIndex + 1; + } + + + // Write Data // + @Override + public void write(byte[] record) + throws ConnectionError, IOException, StreamingException { + Object row = serializeToObject(record); + updater.insert(getCurrentTxnId(), 1, row); + } + + private Object serializeToObject(byte[] record) throws SerializationError { + try { + BytesWritable blob = new BytesWritable(); + blob.set(record, 0, record.length); + return serde.deserialize(blob); + } catch (SerDeException e) { + throw new SerializationError("Unable to convert byte[] record into Object", e); + } + } + + public void write(Collection records) + throws ConnectionError, IOException, StreamingException { + for(byte[] record : records) { + write(record); + } + } + + + @Override + public void commit() throws StreamingException { + try { + updater.flush(); + msClient.commitTxn(txnIds.get(currentTxnIndex)); + state = TxnState.COMMITTED; + } catch (NoSuchTxnException e) { + throw new InvalidTrasactionState("Invalid transaction id : " + + getCurrentTxnId(), e); + } catch (TxnAbortedException e) { + throw new InvalidTrasactionState("Aborted transaction cannot be committed" + , e); + } catch (TException e) { + throw new StreamingException("Unable to commit transaction" + + getCurrentTxnId(), e); + } catch (IOException e) { + throw new StreamingException("Unable to commit transaction", e); + } + } + + @Override + public void abort() throws StreamingException { + try { + msClient.rollbackTxn(getCurrentTxnId()); + state = TxnState.ABORTED; + } catch (NoSuchTxnException e) { + throw new InvalidTrasactionState("Invalid transaction id : " + + getCurrentTxnId(), e); + } catch (TException e) { + throw new StreamingException("Unable to abort transaction id : " + + getCurrentTxnId(), e); + } + } + + @Override + public void close() { + try { + updater.close(false); + } catch (IOException e) { + } + } + } // class StreamingTransactionBatch + +} // class HiveStreamingEndPoint + +// Racing to create new partition diff --git streaming/src/java/org/apache/hive/streaming/InvalidPartition.java streaming/src/java/org/apache/hive/streaming/InvalidPartition.java new file mode 100644 index 0000000..94f6b6e --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/InvalidPartition.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +public class InvalidPartition extends StreamingException { + + public InvalidPartition(String partitionName, String partitionValue) { + super("Invalid partition: Name=" + partitionName + + ", Value=" + partitionValue); + } + +} diff --git streaming/src/java/org/apache/hive/streaming/InvalidTable.java streaming/src/java/org/apache/hive/streaming/InvalidTable.java new file mode 100644 index 0000000..da862b7 --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/InvalidTable.java @@ -0,0 +1,16 @@ +package org.apache.hive.streaming; + +public class InvalidTable extends StreamingException { + + private static String makeMsg(String db, String table) { + return "Invalid table db:" + db + ", table:" + table; + } + + public InvalidTable(String db, String table, Exception cause) { + super(makeMsg(db,table), cause); + } + + public InvalidTable(String db, String table) { + super(makeMsg(db,table)); + } +} diff --git streaming/src/java/org/apache/hive/streaming/SerializationError.java streaming/src/java/org/apache/hive/streaming/SerializationError.java new file mode 100644 index 0000000..f2d4b6f --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/SerializationError.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +import org.apache.hadoop.hive.serde2.SerDeException; + +public class SerializationError extends StreamingException { + public SerializationError(String msg, SerDeException e) { + super(msg,e); + } +} diff --git streaming/src/java/org/apache/hive/streaming/StreamingConnection.java streaming/src/java/org/apache/hive/streaming/StreamingConnection.java new file mode 100644 index 0000000..79a5e85 --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/StreamingConnection.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +public interface StreamingConnection { + /** + * Acquire a set of transactions + * @param numTransactionsHint a hint about the number of desired transactions + * @return a batch of transactions + */ + public TransactionBatch fetchTransactionBatch(int numTransactionsHint) + throws ConnectionError, InvalidPartition, StreamingException; + + /** + * Close connection + */ + public void close(); + +} diff --git streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java new file mode 100644 index 0000000..fb09488 --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + + +public class StreamingIOFailure extends StreamingException { + + public StreamingIOFailure(String msg, Exception cause) { + super(msg, cause); + } + + public StreamingIOFailure(String msg) { + super(msg); + } +} diff --git streaming/src/java/org/apache/hive/streaming/StreamingWriter.java streaming/src/java/org/apache/hive/streaming/StreamingWriter.java deleted file mode 100644 index 793465e..0000000 --- streaming/src/java/org/apache/hive/streaming/StreamingWriter.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hive.streaming; - -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.LockComponentBuilder; -import org.apache.hadoop.hive.metastore.LockRequestBuilder; -import org.apache.hadoop.hive.metastore.api.LockComponent; -import org.apache.hadoop.hive.metastore.api.LockRequest; -import org.apache.hadoop.hive.metastore.api.LockResponse; -import org.apache.hadoop.hive.metastore.api.LockState; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; -import org.apache.hadoop.hive.metastore.api.TxnAbortedException; -import org.apache.thrift.TException; - -import java.util.Collection; -import java.util.List; - -public class StreamingWriter { - - private final IMetaStoreClient msClient; - private final HiveStreamingEndPoint hiveEndPoint; - private List txnIds; - private int currentTxnIndex; - - public enum TxnState { NOT_ACTIVE, OPEN, COMMITTED, ABORTED } - private TxnState txnState; - private final LockRequest lockRequest; - - public StreamingWriter(HiveStreamingEndPoint hiveEndPoint) - throws ConnectionError { - this.hiveEndPoint = hiveEndPoint; - - HiveConf conf = new HiveConf(); - conf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveEndPoint.metaStoreUri); - try { - msClient = new HiveMetaStoreClient(conf); - } catch (MetaException e) { - throw new ConnectionError("Error connecting to metatore URI: " - + hiveEndPoint.metaStoreUri, e); - } - currentTxnIndex = -1; - txnState = TxnState.NOT_ACTIVE; - - this.lockRequest = createLockRequest(hiveEndPoint); - } - - public void close() { - msClient.close(); - txnState = TxnState.NOT_ACTIVE; - } - - // Transaction Support // - public void fetchTransactionBatch(int numTransactions) - throws ConnectionError { - try { - txnState = TxnState.NOT_ACTIVE; - txnIds = msClient.openTxns(numTransactions).getTxn_ids(); - } catch (TException e) { - throw new ConnectionError("Error fetching new transaction batch", e); - } - } - - public void beginNextTransaction() throws StreamingException { - if(currentTxnIndex >= txnIds.size()) - throw new InvalidTrasactionState("No more transactions available in the" + - " current batch"); - ++currentTxnIndex; - - try { - LockResponse res = msClient.lock(lockRequest); - if(res.getState() != LockState.ACQUIRED) { - throw new StreamingException("Unable to acquire partition lock"); - } - } catch (TException e) { - throw new StreamingException("Unable to acquire partition lock", e); - } - - txnState = TxnState.OPEN; - } - - private Long getCurrentTxnId() { - return txnIds.get(currentTxnIndex); - } - - public void commit() throws StreamingException { - try { - msClient.commitTxn(txnIds.get(currentTxnIndex)); - txnState = TxnState.COMMITTED; - } catch (NoSuchTxnException e) { - throw new InvalidTrasactionState("Invalid transaction id : " - + getCurrentTxnId(), e); - } catch (TxnAbortedException e) { - throw new InvalidTrasactionState("Aborted transaction cannot be committed" - , e); - } catch (TException e) { - throw new StreamingException("Unable to commit transaction" - + getCurrentTxnId(), e); - } - } - - public void abort() throws StreamingException { - try { - msClient.rollbackTxn(getCurrentTxnId()); - txnState = TxnState.ABORTED; - } catch (NoSuchTxnException e) { - throw new InvalidTrasactionState("Invalid transaction id : " - + getCurrentTxnId(), e); - } catch (TException e) { - throw new StreamingException("Unable to abort transaction id : " - + getCurrentTxnId(), e); - } - } - - public TxnState getTransactionState() { - return txnState; - } - - // active txn is not considered part of remaining txn - public int remainingTransactions() { - return txnIds.size() - currentTxnIndex + 1; - } - - - // Write Data // - public void write(byte[] record) - throws ConnectionError { - /// - } - public void write(Collection records) - throws ConnectionError { - - } - - private static LockRequest createLockRequest(HiveStreamingEndPoint hiveEndPoint) { - LockRequestBuilder rqstBuilder = new LockRequestBuilder(); - for( String partitionName : hiveEndPoint.partitionSpec.keySet() ) { - rqstBuilder.addLockComponent(new LockComponentBuilder() - .setDbName(hiveEndPoint.database) - .setTableName(hiveEndPoint.table) - .setPartitionName(partitionName) - .setShared() - .build()); - } - return rqstBuilder.build(); - } - -} diff --git streaming/src/java/org/apache/hive/streaming/TransactionBatch.java streaming/src/java/org/apache/hive/streaming/TransactionBatch.java new file mode 100644 index 0000000..e772d39 --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/TransactionBatch.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.thrift.TException; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + + +public interface TransactionBatch { + public enum TxnState {INACTIVE, OPEN, COMMITTED, ABORTED } + + public void beginNextTransaction() throws StreamingException; + +// private Long getCurrentTxnId(); // + + public TxnState getCurrentTransactionState(); + public void commit() throws StreamingException; + public void abort() throws StreamingException; + + // active txn is not considered part of remaining txn // + public int remainingTransactions(); + + + // Write Data for current Txn // + public void write(byte[] record) throws ConnectionError, IOException, StreamingException; + public void write(Collection records) throws ConnectionError, IOException, StreamingException; + + public void close(); +} diff --git streaming/src/test/org/apache/hive/streaming/TestStreaming.java streaming/src/test/org/apache/hive/streaming/TestStreaming.java new file mode 100644 index 0000000..1c90ec3 --- /dev/null +++ streaming/src/test/org/apache/hive/streaming/TestStreaming.java @@ -0,0 +1,267 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +import junit.framework.Assert; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.io.RCFileInputFormat; +import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.thrift.TException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + + +public class TestStreaming { + final static String serdeClass = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"; +// private final static String jdbcString = "jdbc:derby:;databaseName=metastore_db;create=true"; +// private final static String jdbcDriver = "org.apache.derby.jdbc.EmbeddedDriver"; + + private final HiveConf conf; + private final IMetaStoreClient msClient; + private final Hive hive; + + final static String nameNodeURI = null; + final static String metaStoreURI = null; + + final static String dbName = "testing"; + final static String tblName = "alerts"; + + List partitionVals; + + public TestStreaming() throws MetaException, HiveException { + conf = setUpHiveConf(); + hive = Hive.get(); + msClient = hive.getMSC(); + + partitionVals = new ArrayList(2); + partitionVals.add("Asia"); + partitionVals.add("India"); + } + + + private static List getPartitionKeys() { + List fields = new ArrayList(); + //Defining partition names in unsorted order + fields.add(new FieldSchema("continent", serdeConstants.STRING_TYPE_NAME, "")); + fields.add(new FieldSchema("country", serdeConstants.STRING_TYPE_NAME, "")); + return fields; + } + + + @Before + public void setup() throws Exception { + TxnDbUtil.cleanDb(); + TxnDbUtil.prepDb(); + + Hive hive = Hive.get(); + IMetaStoreClient msClient = hive.getMSC(); + dropDB(msClient, dbName); + createDbAndTable(msClient, dbName, tblName, partitionVals); + } + + @After + public void tearDown() throws Exception { + dropDB(msClient,dbName); + TxnDbUtil.cleanDb(); + } + + @Test + public void testEndpointConnection() throws Exception { + HiveStreamingEndPoint hiveEP = new HiveStreamingEndPoint(nameNodeURI, + metaStoreURI, dbName, tblName, partitionVals); + StreamingConnection connection = hiveEP.newConnection(serdeClass, true); // should not throw + connection.close(); + } + + @Test + public void testTransactionBatchEmptyCommit() throws Exception { + HiveStreamingEndPoint hiveEP = new HiveStreamingEndPoint( + "localhost://namnode.uri", null /* Metastore */, + dbName, tblName, partitionVals); + StreamingConnection connection = hiveEP.newConnection(serdeClass, true); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(10); + txnBatch.beginNextTransaction(); + txnBatch.commit(); + Assert.assertEquals(TransactionBatch.TxnState.COMMITTED + , txnBatch.getCurrentTransactionState()); + connection.close(); + } + + @Test + public void testTransactionBatchEmptyAbort() throws Exception { + HiveStreamingEndPoint hiveEP = new HiveStreamingEndPoint( + "localhost://namnode.uri", null /* Metastore */, + dbName, tblName, partitionVals); + StreamingConnection connection = hiveEP.newConnection(serdeClass, true); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(10); + txnBatch.beginNextTransaction(); + txnBatch.abort(); + Assert.assertEquals(TransactionBatch.TxnState.ABORTED + , txnBatch.getCurrentTransactionState()); + connection.close(); + } + + @Test + public void testTransactionBatchCommit() throws Exception { + HiveStreamingEndPoint hiveEP = new HiveStreamingEndPoint( + "localhost://namnode.uri", null /* Metastore */, + dbName, tblName, partitionVals); + StreamingConnection connection = hiveEP.newConnection(serdeClass, true); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(10); + txnBatch.beginNextTransaction(); + txnBatch.write("1Hello streaming".getBytes()); + txnBatch.write("2Welcome to streaming".getBytes()); + txnBatch.commit(); + Assert.assertEquals(TransactionBatch.TxnState.COMMITTED + , txnBatch.getCurrentTransactionState()); + connection.close(); + } + + @Test + public void testTransactionBatchAbort() throws Exception { + HiveStreamingEndPoint hiveEP = new HiveStreamingEndPoint( + "localhost://namnode.uri", null /* Metastore */, + dbName, tblName, partitionVals); + StreamingConnection connection = hiveEP.newConnection(serdeClass, true); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(10); + txnBatch.beginNextTransaction(); + txnBatch.write("1Hello streaming".getBytes()); + txnBatch.write("2Welcome to streaming".getBytes()); + txnBatch.abort(); + Assert.assertEquals(TransactionBatch.TxnState.ABORTED + , txnBatch.getCurrentTransactionState()); + connection.close(); + } + + + protected HiveConf setUpHiveConf() { + HiveConf hiveConf = new HiveConf(this.getClass()); +// hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, ""); +// hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, ""); +// hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); +// hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR); + return hiveConf; + } + + + // delete db and all tables in it + public static void dropDB(IMetaStoreClient client, String databaseName) { + try { + for(String table : client.listTableNamesByFilter(databaseName, "", (short)-1)) { + client.dropTable(databaseName, table, true, true); + } + client.dropDatabase(databaseName); + } catch (TException e) { + } + + } + + public static void createDbAndTable(IMetaStoreClient client, String databaseName, + String tableName, List partVals) throws Exception { + Database db = new Database(); + db.setName(databaseName); + client.createDatabase(db); + + Table tbl = new Table(); + tbl.setDbName(databaseName); + tbl.setTableName(tableName); + tbl.setTableType(TableType.MANAGED_TABLE.toString()); + StorageDescriptor sd = new StorageDescriptor(); + sd.setCols(getTableColumns()); + tbl.setPartitionKeys(getPartitionKeys()); + + tbl.setSd(sd); + + sd.setBucketCols(new ArrayList(2)); + sd.setSerdeInfo(new SerDeInfo()); + sd.getSerdeInfo().setName(tbl.getTableName()); + sd.getSerdeInfo().setParameters(new HashMap()); + sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1"); + + sd.getSerdeInfo().setSerializationLib(serdeClass()); + sd.setInputFormat(inputFormat()); + sd.setOutputFormat(outputFormat()); + + Map tableParams = new HashMap(); + tbl.setParameters(tableParams); + client.createTable(tbl); + + addPartition(client, tbl, partVals); + } + + + private static void addPartition(IMetaStoreClient client, Table tbl, List partValues) + throws IOException, TException { + Partition part = new Partition(); + part.setDbName(dbName); + part.setTableName(tblName); + part.setSd(tbl.getSd()); + part.setValues(partValues); + client.add_partition(part); + } + + private static String serdeClass() { +// ColumnarSerDe.class.getName(); + return LazySimpleSerDe.class.getName(); + } + + protected static String inputFormat() { + return RCFileInputFormat.class.getName(); + } + + protected static String outputFormat() { + return RCFileOutputFormat.class.getName(); + } + + private static List getTableColumns() { + List fields = new ArrayList(); + fields.add(new FieldSchema("id", serdeConstants.INT_TYPE_NAME, "")); + fields.add(new FieldSchema("msg", serdeConstants.STRING_TYPE_NAME, "")); + return fields; + } +} + + +