diff --git pom.xml pom.xml index 41f5337..f5fba08 100644 --- pom.xml +++ pom.xml @@ -41,6 +41,7 @@ ql serde service + streaming shims testutils packaging diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/EnforceReadOnlyTables.java ql/src/java/org/apache/hadoop/hive/ql/hooks/EnforceReadOnlyTables.java index 1458075..8dfe24f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/hooks/EnforceReadOnlyTables.java +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/EnforceReadOnlyTables.java @@ -56,8 +56,7 @@ public void run(HookContext hookContext) throws Exception { this.run(ss,inputs,outputs,ugi); } - public void run(SessionState sess, Set inputs, - Set outputs, UserGroupInformation ugi) + public void run(SessionState sess, Set inputs, Set outputs, UserGroupInformation ugi) throws Exception { if (sess.getConf().getBoolean("hive.test.init.phase", false) == true) { return; diff --git streaming/pom.xml streaming/pom.xml new file mode 100644 index 0000000..eae34b4 --- /dev/null +++ streaming/pom.xml @@ -0,0 +1,100 @@ + + + + 4.0.0 + + org.apache.hive + hive + 0.13.0-SNAPSHOT + ../pom.xml + + + hive-streaming + jar + Hive Streaming + + + .. + + + + + + + org.apache.hive + hive-serde + ${project.version} + + + org.apache.hive + hive-metastore + ${project.version} + + + org.apache.hive + hive-metastore + ${project.version} + + + org.apache.hive + hive-exec + ${project.version} + + + org.apache.hive + hive-cli + ${project.version} + + + + org.apache.hadoop + hadoop-core + ${hadoop-20S.version} + + + + + junit + junit + ${junit.version} + test + + + + + + ${basedir}/src/java + ${basedir}/src/test + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + + + diff --git streaming/src/java/org/apache/hive/streaming/AbstractLazySimpleRecordWriter.java streaming/src/java/org/apache/hive/streaming/AbstractLazySimpleRecordWriter.java new file mode 100644 index 0000000..dd773d9 --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/AbstractLazySimpleRecordWriter.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.thrift.TException; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Random; + +/** + * Uses LazySimple serde to enode the record + */ +public abstract class AbstractLazySimpleRecordWriter implements RecordWriter { + + private final Path partitionPath; + private final int totalBuckets; + + private OrcOutputFormat outf = new OrcOutputFormat(); + private HiveConf conf = new HiveConf(); + private RecordUpdater updater = null; + private final ArrayList tableColumns; + private final static String serdeClassName = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"; + private LazySimpleSerDe serde; + private ObjectInspector inspector; + private Random rand = new Random(); + private int currentBucketId = 0; + + public final static char outputFieldSeparator + = (char)LazySimpleSerDe.DefaultSeparators[0]; + + + protected AbstractLazySimpleRecordWriter(HiveEndPoint endPoint) + throws ConnectionError, StreamingException, ClassNotFoundException { + try { + if(endPoint.metaStoreUri!=null) { + conf.setVar(HiveConf.ConfVars.METASTOREURIS, endPoint.metaStoreUri); + } + + Hive hive = Hive.get(conf, false); + + Table tbl = hive.getTable(endPoint.database, endPoint.table); + Properties tableProps = MetaStoreUtils.getTableMetadata(tbl.getTTable()); + + this.serde = createSerde(tableProps, conf); + this.inspector = this.serde.getObjectInspector(); + this.tableColumns = getPartitionCols(tbl); + this.partitionPath = getPathForPartition(hive, endPoint.database, + endPoint.table, endPoint.partitionVals); + this.totalBuckets = hive.getTable(endPoint.database, endPoint.table) + .getNumBuckets(); + + } catch (HiveException e) { + throw new ConnectionError("Problem connecting to Hive", e); + } catch (SerDeException e) { + throw new StreamingException("Failed to create serde " + serdeClassName, e); + } + + } + + private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxTxnID) + throws StreamingException { + try { + return outf.getRecordUpdater(partitionPath, + new AcidOutputFormat.Options(conf) + .inspector(inspector) + .bucket(bucketId) + .minimumTransactionId(minTxnId) + .maximumTransactionId(maxTxnID) + .useDummy(System.out) ); + } catch (IOException e) { + throw new StreamingException("Failed to get record updater", e); + } + } + + protected abstract byte[] reorderFields(byte[] record) + throws UnsupportedEncodingException; + + public ArrayList getTableColumns() { + return tableColumns; + } + + @Override + public void write(long transactionId, byte[] record) + throws SerializationError, StreamingIOFailure { + try { + byte[] orderedFields = reorderFields(record); + Object encodedRow = encode(orderedFields); + updater.insert(transactionId, currentBucketId, encodedRow); + } catch (IOException e) { + throw new StreamingIOFailure("Error writing record in transaction(" + + transactionId + ")", e); + } + } + + @Override + public void flush() throws StreamingIOFailure { + try { + updater.flush(); + } catch (IOException e) { + throw new StreamingIOFailure("Unable to flush recordUpdater", e); + } + } + + + @Override + public void newBatch(Long minTxnId, Long maxTxnID) throws StreamingException { + this.currentBucketId = rand.nextInt(totalBuckets); + updater = createRecordUpdater(currentBucketId, minTxnId, maxTxnID); + } + + @Override + public void closeBatch() throws StreamingIOFailure { + try { + updater.close(false); + updater = null; + } catch (IOException e) { + throw new StreamingIOFailure("Unable to close recordUpdater", e); + } + } + + private Object encode(byte[] record) throws SerializationError { + try { + BytesWritable blob = new BytesWritable(); + blob.set(record, 0, record.length); + return serde.deserialize(blob); + } catch (SerDeException e) { + throw new SerializationError("Unable to convert byte[] record into Object", e); + } + } + + private static LazySimpleSerDe createSerde(Properties tableProps, HiveConf conf) + throws ClassNotFoundException, SerializationError { + try { + Class serdeClass = Class.forName(serdeClassName); + LazySimpleSerDe serde = + (LazySimpleSerDe) ReflectionUtils.newInstance(serdeClass, conf); + serde.initialize(conf, tableProps); + return serde; + } catch (SerDeException e) { + throw new SerializationError("Error initializing serde", e); + } + } + + private Path getPathForPartition(Hive hive, String database, String table, + List partitionVals) + throws StreamingException { + try { + IMetaStoreClient msClient = hive.getMSC(); + String location = + msClient.getPartition(database,table,partitionVals).getSd().getLocation(); + msClient.close(); + return new Path(location); + } catch (TException e) { + throw new StreamingException(e.getMessage() + + ". Unable to get partitionPath for specified partition: " + + partitionVals, e); + } + } + + private ArrayList getPartitionCols(Table table) { + List cols = table.getCols(); + ArrayList colNames = new ArrayList(cols.size()); + for(FieldSchema col : cols) { + colNames.add(col.getName().toLowerCase()); + } + return colNames; + } + +} diff --git streaming/src/java/org/apache/hive/streaming/ConnectionError.java streaming/src/java/org/apache/hive/streaming/ConnectionError.java new file mode 100644 index 0000000..e0ff1da --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/ConnectionError.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +public class ConnectionError extends StreamingException { + + public ConnectionError(String msg, Exception innerEx) { + super(msg, innerEx); + } + public ConnectionError(Exception e) { + super(e.getMessage(), e); + } +} diff --git streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java new file mode 100644 index 0000000..1ce3086 --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.List; + +/** + * Streaming Writer handles delimited input (eg. CSV). + * Delimited input is parsed & reordered to match column order in table + */ +public class DelimitedInputWriter extends AbstractLazySimpleRecordWriter { + private String delimiter; + private int[] fieldToColMapping; + + public DelimitedInputWriter(String[] colNamesForFields, String delimiter, + HiveEndPoint endPoint) + throws ClassNotFoundException, StreamingException, IOException { + super(endPoint); + this.delimiter = delimiter; + + this.fieldToColMapping = getFieldReordering(colNamesForFields, getTableColumns()); + } + + private int[] getFieldReordering(String[] colNamesForFields, List tableColNames) + throws InvalidColumn { + int[] result = new int[ colNamesForFields.length ]; + for(int i=0; i partitionVals; + + public HiveEndPoint(String metaStoreUri + , String database, String table, List partitionVals) { + this.metaStoreUri = metaStoreUri; + this.database = database; + this.table = table; + this.partitionVals = partitionVals; + } + + public StreamingConnection newConnection(String user, boolean createPartIfNotExists) + throws ConnectionError, InvalidPartition, ClassNotFoundException + , StreamingException { + if(createPartIfNotExists) { + createPartitionIfNotExists(); + } + return new ConnectionImpl(this, user); + } + + private boolean createPartitionIfNotExists() throws InvalidTable, StreamingException { + HiveConf conf = new HiveConf(); + IMetaStoreClient msClient = getMetaStoreClient(this, conf); + + try { + Partition part = new Partition(); + try { + Table table1 = msClient.getTable(database, table); + part.setDbName(database); + part.setTableName(table); + part.setValues(partitionVals); + part.setParameters(new HashMap()); + part.setSd(table1.getSd()); + } catch (NoSuchObjectException e) { + throw new InvalidTable(database, table); + } catch (TException e) { + throw new StreamingException("Cannot connect to table DB:" + + database + ", Table: " + table, e); + } + + try { + msClient.add_partition(part); + return true; + } catch (AlreadyExistsException e) { + return false; + } catch (TException e) { + throw new StreamingException("Partition creation failed"); + } + } finally { + msClient.close(); + } + } + + + // uses embedded store if endpoint.metastoreUri is null + private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveConf conf) + throws ConnectionError { + if(endPoint.metaStoreUri== null) { + TxnDbUtil.setConfValues(conf); + } else { + conf.setVar(HiveConf.ConfVars.METASTOREURIS, endPoint.metaStoreUri); + } + + try { + return Hive.get(conf).getMSC(); + } catch (MetaException e) { + throw new ConnectionError("Error connecting to Hive Metastore URI: " + + endPoint.metaStoreUri, e); + } catch (HiveException e) { + throw new ConnectionError("Error connecting to Hive Metastore URI: " + + endPoint.metaStoreUri, e); + } + } + + + + private static class ConnectionImpl implements StreamingConnection { + private final HiveConf conf; + private final IMetaStoreClient msClient; + private final LockRequest lockRequest; + private String user; + private Random rand = new Random(); // used for generating bucketIDs + + private ConnectionImpl(HiveEndPoint endPoint, String user) + throws ConnectionError, InvalidPartition, ClassNotFoundException { + this.conf = new HiveConf(); + this.msClient = getMetaStoreClient(endPoint, conf); + this.user = user; + this.lockRequest = createLockRequest(endPoint); + } + + public void close() { + msClient.close(); + } + + @Override + public TransactionBatch fetchTransactionBatch(int numTransactions, + RecordWriter recordWriter) + throws ConnectionError, InvalidPartition, StreamingException { + return new TransactionBatchImpl(user, numTransactions, msClient, + lockRequest, recordWriter); + } + + private static LockRequest createLockRequest(HiveEndPoint hiveEndPoint) + throws InvalidPartition { + LockRequestBuilder rqstBuilder = new LockRequestBuilder(); + + for( String partition : hiveEndPoint.partitionVals ) { + rqstBuilder.addLockComponent(new LockComponentBuilder() + .setDbName(hiveEndPoint.database) + .setTableName(hiveEndPoint.table) + .setPartitionName(partition) + .setShared() + .build()); + } + return rqstBuilder.build(); + } + } // class ConnectionImpl + + private static class TransactionBatchImpl implements TransactionBatch { + private final List txnIds; + private int currentTxnIndex; + private final IMetaStoreClient msClient; + private final RecordWriter recordWriter; + + private TxnState state; + private final LockRequest lockRequest; + + private TransactionBatchImpl(String user, int numTxns, IMetaStoreClient msClient, + LockRequest lockRequest, + RecordWriter recordWriter) + throws StreamingException { + try { + this.msClient = msClient; + + this.lockRequest = lockRequest; + this.recordWriter = recordWriter; + this.txnIds = msClient.openTxns(user, numTxns).getTxn_ids(); + this.currentTxnIndex = -1; + this.state = TxnState.INACTIVE; + recordWriter.newBatch(txnIds.get(0), txnIds.get(txnIds.size()-1)); + } catch (TException e) { + throw new ConnectionError("Unable to fetch new transaction batch", e); + } + } + + public void beginNextTransaction() throws StreamingException { + if(currentTxnIndex >= txnIds.size()) + throw new InvalidTrasactionState("No more transactions available in" + + " current batch"); + ++currentTxnIndex; + + try { + LockResponse res = msClient.lock(lockRequest); + if(res.getState() != LockState.ACQUIRED) { + throw new StreamingException("Unable to acquire partition lock"); + } + } catch (TException e) { + throw new StreamingException("Unable to acquire partition lock", e); + } + + state = TxnState.OPEN; + } + + public Long getCurrentTxnId() { + return txnIds.get(currentTxnIndex); + } + + public TxnState getCurrentTransactionState() { + return state; + } + + // active txn is not considered part of remaining txn + public int remainingTransactions() { + return txnIds.size() - currentTxnIndex + 1; + } + + + /** Write using RecordWriter */ + @Override + public void write(byte[] record) + throws ConnectionError, IOException, StreamingException { + recordWriter.write(getCurrentTxnId(), record); + } + + public void write(Collection records) + throws ConnectionError, IOException, StreamingException { + for(byte[] record : records) { + write(record); + } + } + + + @Override + public void commit() throws StreamingException { + try { + recordWriter.flush(); + msClient.commitTxn(txnIds.get(currentTxnIndex)); + state = TxnState.COMMITTED; + } catch (NoSuchTxnException e) { + throw new InvalidTrasactionState("Invalid transaction id : " + + getCurrentTxnId(), e); + } catch (TxnAbortedException e) { + throw new InvalidTrasactionState("Aborted transaction cannot be committed" + , e); + } catch (TException e) { + throw new StreamingException("Unable to commit transaction" + + getCurrentTxnId(), e); + } + } + + @Override + public void abort() throws StreamingException { + try { + msClient.rollbackTxn(getCurrentTxnId()); + state = TxnState.ABORTED; + } catch (NoSuchTxnException e) { + throw new InvalidTrasactionState("Invalid transaction id : " + + getCurrentTxnId(), e); + } catch (TException e) { + throw new StreamingException("Unable to abort transaction id : " + + getCurrentTxnId(), e); + } + } + + @Override + public void close() throws StreamingException { + state = TxnState.INACTIVE; + recordWriter.closeBatch(); + } + } // class TransactionBatchImpl + +} // class HiveEndPoint + +// Racing to create new partition diff --git streaming/src/java/org/apache/hive/streaming/InvalidColumn.java streaming/src/java/org/apache/hive/streaming/InvalidColumn.java new file mode 100644 index 0000000..08d8fdf --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/InvalidColumn.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +public class InvalidColumn extends StreamingException { + + public InvalidColumn(String msg) { + super(msg); + } +} diff --git streaming/src/java/org/apache/hive/streaming/InvalidPartition.java streaming/src/java/org/apache/hive/streaming/InvalidPartition.java new file mode 100644 index 0000000..94f6b6e --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/InvalidPartition.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +public class InvalidPartition extends StreamingException { + + public InvalidPartition(String partitionName, String partitionValue) { + super("Invalid partition: Name=" + partitionName + + ", Value=" + partitionValue); + } + +} diff --git streaming/src/java/org/apache/hive/streaming/InvalidTable.java streaming/src/java/org/apache/hive/streaming/InvalidTable.java new file mode 100644 index 0000000..c92fe2a --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/InvalidTable.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +public class InvalidTable extends ConnectionError { + + private static String makeMsg(String db, String table) { + return "Invalid table db:" + db + ", table:" + table; + } + + public InvalidTable(String db, String table, Exception cause) { + super(makeMsg(db,table), cause); + } + + public InvalidTable(String db, String table) { + super(makeMsg(db,table), null); + } +} + diff --git streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java new file mode 100644 index 0000000..80334c5 --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +public class InvalidTrasactionState extends RuntimeException { + public InvalidTrasactionState(String msg) { + super(msg); + } + + public InvalidTrasactionState(String msg, Exception e) { + super(msg,e); + } +} diff --git streaming/src/java/org/apache/hive/streaming/RecordWriter.java streaming/src/java/org/apache/hive/streaming/RecordWriter.java new file mode 100644 index 0000000..e70fec9 --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/RecordWriter.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +public interface RecordWriter { + + /** Write using RecordUpdater */ + public void write(long transactionId, byte[] record) throws StreamingException; + /** flush records */ + public void flush() throws StreamingException; + + /** acquire a new RecordUpdater */ + public void newBatch(Long minTxnId, Long maxTxnID) throws StreamingException; + /** close the RecordUpdater */ + public void closeBatch() throws StreamingException; +} + diff --git streaming/src/java/org/apache/hive/streaming/SerializationError.java streaming/src/java/org/apache/hive/streaming/SerializationError.java new file mode 100644 index 0000000..6844aca --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/SerializationError.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + + +public class SerializationError extends StreamingException { + public SerializationError(String msg, Exception e) { + super(msg,e); + } +} diff --git streaming/src/java/org/apache/hive/streaming/StreamingConnection.java streaming/src/java/org/apache/hive/streaming/StreamingConnection.java new file mode 100644 index 0000000..5146a5a --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/StreamingConnection.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + + +public interface StreamingConnection { + + /** + * Acquire a set of transactions + * @param numTransactionsHint a hint about the number of desired transactions + * @return a batch of transactions + */ + public TransactionBatch fetchTransactionBatch(int numTransactionsHint, + RecordWriter writer) + throws ConnectionError, StreamingException; + + /** + * Close connection + */ + public void close(); + +} diff --git streaming/src/java/org/apache/hive/streaming/StreamingException.java streaming/src/java/org/apache/hive/streaming/StreamingException.java new file mode 100644 index 0000000..9486d6d --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/StreamingException.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +public class StreamingException extends Exception { + public StreamingException(String msg, Exception cause) { + super(msg, cause); + } + public StreamingException(String msg) { + super(msg); + } +} diff --git streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java new file mode 100644 index 0000000..fb09488 --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + + +public class StreamingIOFailure extends StreamingException { + + public StreamingIOFailure(String msg, Exception cause) { + super(msg, cause); + } + + public StreamingIOFailure(String msg) { + super(msg); + } +} diff --git streaming/src/java/org/apache/hive/streaming/TransactionBatch.java streaming/src/java/org/apache/hive/streaming/TransactionBatch.java new file mode 100644 index 0000000..cf96bef --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/TransactionBatch.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.thrift.TException; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + + +public interface TransactionBatch { + public enum TxnState {INACTIVE, OPEN, COMMITTED, ABORTED } + + public void beginNextTransaction() throws StreamingException; + + public Long getCurrentTxnId(); + + public TxnState getCurrentTransactionState(); + public void commit() throws StreamingException; + public void abort() throws StreamingException; + + // active txn is not considered part of remaining txn // + public int remainingTransactions(); + + + // Write Data for current Txn // + public void write(byte[] record) throws ConnectionError, IOException, StreamingException; + public void write(Collection records) throws ConnectionError, IOException, StreamingException; + + public void close() throws StreamingException; +} diff --git streaming/src/test/org/apache/hive/streaming/TestStreaming.java streaming/src/test/org/apache/hive/streaming/TestStreaming.java new file mode 100644 index 0000000..5a50903 --- /dev/null +++ streaming/src/test/org/apache/hive/streaming/TestStreaming.java @@ -0,0 +1,324 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +import junit.framework.Assert; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.io.RCFileInputFormat; +import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.thrift.TException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + + +public class TestStreaming { + + private static final String COL1 = "id"; + private static final String COL2 = "msg"; + + private final IMetaStoreClient msClient; + private final Hive hive; + + final static String metaStoreURI = null; + + final static String dbName = "testing"; + final static String tblName = "alerts"; + final String[] fieldNames; + + List partitionVals; + private String user = "roshan"; + + public TestStreaming() throws MetaException, HiveException { + hive = Hive.get(); + msClient = hive.getMSC(); + + partitionVals = new ArrayList(2); + partitionVals.add("Asia"); + partitionVals.add("India"); + + fieldNames = new String[]{COL1,COL2}; + } + + + private static List getPartitionKeys() { + List fields = new ArrayList(); + //Defining partition names in unsorted order + fields.add(new FieldSchema("continent", serdeConstants.STRING_TYPE_NAME, "")); + fields.add(new FieldSchema("country", serdeConstants.STRING_TYPE_NAME, "")); + return fields; + } + + + @Before + public void setup() throws Exception { + TxnDbUtil.cleanDb(); + TxnDbUtil.prepDb(); + + Hive hive = Hive.get(); + IMetaStoreClient msClient = hive.getMSC(); + dropDB(msClient, dbName); + createDbAndTable(msClient, dbName, tblName, partitionVals); + } + + @After + public void tearDown() throws Exception { + dropDB(msClient,dbName); + TxnDbUtil.cleanDb(); + } + + @Test + public void testEndpointConnection() throws Exception { + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName + , partitionVals); + StreamingConnection connection = endPt.newConnection(user, false); //shouldn't throw + connection.close(); + } + + @Test + public void testAddPartition() throws Exception { + List newPartVals = new ArrayList(2); + newPartVals.add("Asia"); + newPartVals.add("Nepal"); + + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName + , newPartVals); + + // Ensure partition is absent + try { + msClient.getPartition(endPt.database, endPt.table, endPt.partitionVals); + Assert.assertTrue("Partition already exists", false); + } catch (NoSuchObjectException e) { + // expect this exception + } + + // Create partition + Assert.assertNotNull( endPt.newConnection(user,true) ); + + // Ensure partition is present + Partition p = msClient.getPartition(endPt.database, endPt.table, endPt.partitionVals); + Assert.assertNotNull("Did not find added partition", p); + } + + @Test + public void testTransactionBatchEmptyCommit() throws Exception { + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, + partitionVals); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); + StreamingConnection connection = endPt.newConnection(user, false); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); + txnBatch.beginNextTransaction(); + txnBatch.commit(); + Assert.assertEquals(TransactionBatch.TxnState.COMMITTED + , txnBatch.getCurrentTransactionState()); + txnBatch.close(); + connection.close(); + } + + @Test + public void testTransactionBatchEmptyAbort() throws Exception { + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, + partitionVals); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); + StreamingConnection connection = endPt.newConnection(user, true); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); + txnBatch.beginNextTransaction(); + txnBatch.abort(); + Assert.assertEquals(TransactionBatch.TxnState.ABORTED + , txnBatch.getCurrentTransactionState()); + txnBatch.close(); + connection.close(); + } + + @Test + public void testTransactionBatchCommit() throws Exception { + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, + partitionVals); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); + StreamingConnection connection = endPt.newConnection(user, true); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); + txnBatch.beginNextTransaction(); + Assert.assertEquals(TransactionBatch.TxnState.OPEN + , txnBatch.getCurrentTransactionState()); + txnBatch.write("1,Hello streaming".getBytes()); + txnBatch.commit(); + Assert.assertEquals(TransactionBatch.TxnState.COMMITTED + , txnBatch.getCurrentTransactionState()); + + txnBatch.beginNextTransaction(); + Assert.assertEquals(TransactionBatch.TxnState.OPEN + , txnBatch.getCurrentTransactionState()); + txnBatch.write("2,Welcome to streaming".getBytes()); + txnBatch.commit(); + Assert.assertEquals(TransactionBatch.TxnState.COMMITTED + , txnBatch.getCurrentTransactionState()); + txnBatch.close(); + connection.close(); + } + + @Test + public void testTransactionBatchAbort() throws Exception { + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, + partitionVals); + StreamingConnection connection = endPt.newConnection(user, false); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); + txnBatch.beginNextTransaction(); + txnBatch.write("1,Hello streaming".getBytes()); + txnBatch.write("2,Welcome to streaming".getBytes()); + txnBatch.abort(); + Assert.assertEquals(TransactionBatch.TxnState.ABORTED + , txnBatch.getCurrentTransactionState()); + txnBatch.close(); + connection.close(); + } + + @Test + public void testMultipleTransactionBatchCommits() throws Exception { + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, + partitionVals); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); + StreamingConnection connection = endPt.newConnection(user, false); + + // 1st Txn Batch + TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); + txnBatch.beginNextTransaction(); + txnBatch.write("1,Hello streaming".getBytes()); + txnBatch.commit(); + txnBatch.beginNextTransaction(); + txnBatch.write("2,Welcome to streaming".getBytes()); + txnBatch.commit(); + txnBatch.close(); + + // 2nd Txn Batch + txnBatch = connection.fetchTransactionBatch(10, writer); + txnBatch.beginNextTransaction(); + txnBatch.write("3,Hello streaming - once again".getBytes()); + txnBatch.commit(); + txnBatch.beginNextTransaction(); + txnBatch.write("4,Welcome to streaming - once again".getBytes()); + txnBatch.commit(); + Assert.assertEquals(TransactionBatch.TxnState.COMMITTED + , txnBatch.getCurrentTransactionState()); + + txnBatch.close(); + + connection.close(); + } + + + // delete db and all tables in it + public static void dropDB(IMetaStoreClient client, String databaseName) { + try { + for(String table : client.listTableNamesByFilter(databaseName, "", (short)-1)) { + client.dropTable(databaseName, table, true, true); + } + client.dropDatabase(databaseName); + } catch (TException e) { + } + + } + + public static void createDbAndTable(IMetaStoreClient client, String databaseName, + String tableName, List partVals) + throws Exception { + Database db = new Database(); + db.setName(databaseName); + client.createDatabase(db); + + Table tbl = new Table(); + tbl.setDbName(databaseName); + tbl.setTableName(tableName); + tbl.setTableType(TableType.MANAGED_TABLE.toString()); + StorageDescriptor sd = new StorageDescriptor(); + sd.setCols(getTableColumns()); + sd.setNumBuckets(10); + tbl.setPartitionKeys(getPartitionKeys()); + + tbl.setSd(sd); + + sd.setBucketCols(new ArrayList(2)); + sd.setSerdeInfo(new SerDeInfo()); + sd.getSerdeInfo().setName(tbl.getTableName()); + sd.getSerdeInfo().setParameters(new HashMap()); + sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1"); + + sd.getSerdeInfo().setSerializationLib(LazySimpleSerDe.class.getName()); + sd.setInputFormat(inputFormat()); + sd.setOutputFormat(outputFormat()); + + Map tableParams = new HashMap(); + tbl.setParameters(tableParams); + client.createTable(tbl); + + addPartition(client, tbl, partVals); + } + + + private static void addPartition(IMetaStoreClient client, Table tbl + , List partValues) + throws IOException, TException { + Partition part = new Partition(); + part.setDbName(dbName); + part.setTableName(tblName); + part.setSd(tbl.getSd()); + part.setValues(partValues); + client.add_partition(part); + } + + protected static String inputFormat() { + return RCFileInputFormat.class.getName(); + } + + protected static String outputFormat() { + return RCFileOutputFormat.class.getName(); + } + + private static List getTableColumns() { + List fields = new ArrayList(); + fields.add(new FieldSchema(COL1, serdeConstants.INT_TYPE_NAME, "")); + fields.add(new FieldSchema(COL2, serdeConstants.STRING_TYPE_NAME, "")); + return fields; + } +}