diff --git metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java index 1bbe02e..ad63dde 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java @@ -1051,6 +1051,24 @@ public void run() { thread.start(); loopUntilHMSReady(port); } + + public static void startMetaStore(final int port, + final HadoopThriftAuthBridge bridge, final HiveConf conf) throws Exception { + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + try { + HiveMetaStore.startMetaStore(port, bridge, conf); + } catch (Throwable e) { + LOG.error("Metastore Thrift Server threw an exception...",e); + } + } + }); + thread.setDaemon(true); + thread.start(); + loopUntilHMSReady(port); + } + /** * A simple connect test to make sure that the metastore is up * @throws Exception diff --git packaging/pom.xml packaging/pom.xml index de9b002..3601b93 100644 --- packaging/pom.xml +++ packaging/pom.xml @@ -132,6 +132,11 @@ ${project.version} + org.apache.hive + hive-streaming + ${project.version} + + org.apache.hive.hcatalog hive-hcatalog-hbase-storage-handler ${project.version} diff --git packaging/src/main/assembly/src.xml packaging/src/main/assembly/src.xml index bdaa47b..d9b0e6b 100644 --- packaging/src/main/assembly/src.xml +++ packaging/src/main/assembly/src.xml @@ -71,6 +71,7 @@ service/**/* shims/**/* testutils/**/* + streaming/**/* / diff --git pom.xml pom.xml index 7343683..5463069 100644 --- pom.xml +++ pom.xml @@ -47,6 +47,7 @@ service shims testutils + streaming packaging diff --git streaming/pom.xml streaming/pom.xml new file mode 100644 index 0000000..1e094bd --- /dev/null +++ streaming/pom.xml @@ -0,0 +1,123 @@ + + + + 4.0.0 + + org.apache.hive + hive + 0.14.0-SNAPSHOT + ../pom.xml + + + hive-streaming + jar + Hive Streaming + + + .. + + + + + hadoop-1 + + + org.apache.hadoop + hadoop-core + true + + + + + hadoop-2 + + + org.apache.hadoop + hadoop-common + true + + + org.apache.hadoop + hadoop-mapreduce-client-core + true + + + + + + + + + + org.apache.hive + hive-serde + ${project.version} + + + org.apache.hive + hive-metastore + ${project.version} + + + org.apache.hive + hive-exec + ${project.version} + + + org.apache.hive + hive-cli + ${project.version} + + + org.apache.hive.hcatalog + hive-hcatalog-core + true + ${project.version} + + + + + junit + junit + ${junit.version} + test + + + + + + ${basedir}/src/java + ${basedir}/src/test + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + + + diff --git streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java new file mode 100644 index 0000000..3182c08 --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.thrift.TException; + +import java.io.IOException; + +import java.util.Random; + +abstract class AbstractRecordWriter implements RecordWriter { + static final private Log LOG = LogFactory.getLog(AbstractRecordWriter.class.getName()); + + final HiveConf conf; + final HiveEndPoint endPoint; + final Table tbl; + + final HiveMetaStoreClient msClient; + RecordUpdater updater = null; + + private final int totalBuckets; + private Random rand = new Random(); + private int currentBucketId = 0; + private final Path partitionPath; + + final OrcOutputFormat outf = new OrcOutputFormat(); + + protected AbstractRecordWriter(HiveEndPoint endPoint) + throws ConnectionError, StreamingException { + this.endPoint = endPoint; + conf = HiveEndPoint.createHiveConf(this.getClass(), endPoint.metaStoreUri); + + try { + msClient = new HiveMetaStoreClient(conf); + this.tbl = msClient.getTable(endPoint.database, endPoint.table); + this.partitionPath = getPathForEndPoint(msClient, endPoint); + this.totalBuckets = tbl.getSd().getNumBuckets(); + if(totalBuckets <= 0) { + throw new StreamingException("Cannot stream to table that has not been bucketed : " + endPoint); + } + } catch (MetaException e) { + throw new ConnectionError(endPoint, e); + } catch (NoSuchObjectException e) { + throw new ConnectionError(endPoint, e); + } catch (TException e) { + throw new StreamingException(e.getMessage(), e); + } + + } + + abstract SerDe getSerde() throws SerializationError; + + @Override + public void flush() throws StreamingIOFailure { + try { + updater.flush(); + } catch (IOException e) { + throw new StreamingIOFailure("Unable to flush recordUpdater", e); + } + } + + @Override + public void clear() throws StreamingIOFailure { + return; + } + + /** + * Creates a new record updater for the new batch + * @param minTxnId + * @param maxTxnID + * @throws StreamingIOFailure if failed to create record updater + */ + @Override + public void newBatch(Long minTxnId, Long maxTxnID) + throws StreamingIOFailure, SerializationError { + try { + this.currentBucketId = rand.nextInt(totalBuckets); + LOG.debug("Creating Record updater"); + updater = createRecordUpdater(currentBucketId, minTxnId, maxTxnID); + } catch (IOException e) { + LOG.error("Failed creating record updater", e); + throw new StreamingIOFailure("Unable to get new record Updater", e); + } + } + + @Override + public void closeBatch() throws StreamingIOFailure { + try { + updater.close(false); + updater = null; + } catch (IOException e) { + throw new StreamingIOFailure("Unable to close recordUpdater", e); + } + } + + private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxTxnID) + throws IOException, SerializationError { + try { + return outf.getRecordUpdater(partitionPath, + new AcidOutputFormat.Options(conf) + .inspector(getSerde().getObjectInspector()) + .bucket(bucketId) + .minimumTransactionId(minTxnId) + .maximumTransactionId(maxTxnID)); + } catch (SerDeException e) { + throw new SerializationError("Failed to get object inspector from Serde " + + getSerde().getClass().getName(), e); + } + } + + private Path getPathForEndPoint(HiveMetaStoreClient msClient, HiveEndPoint endPoint) + throws StreamingException { + try { + String location; + if(endPoint.partitionVals==null || endPoint.partitionVals.isEmpty() ) { + location = msClient.getTable(endPoint.database,endPoint.table) + .getSd().getLocation(); + } else { + location = msClient.getPartition(endPoint.database, endPoint.table, + endPoint.partitionVals).getSd().getLocation(); + } + return new Path(location); + } catch (TException e) { + throw new StreamingException(e.getMessage() + + ". Unable to get path for end point: " + + endPoint.partitionVals, e); + } + } +} diff --git streaming/src/java/org/apache/hive/streaming/ConnectionError.java streaming/src/java/org/apache/hive/streaming/ConnectionError.java new file mode 100644 index 0000000..07c57dc --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/ConnectionError.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +public class ConnectionError extends StreamingException { + + public ConnectionError(String msg, Exception innerEx) { + super(msg, innerEx); + } + + public ConnectionError(HiveEndPoint endPoint, Exception innerEx) { + super("Error connecting to " + endPoint, innerEx); + } +} diff --git streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java new file mode 100644 index 0000000..2cf696c --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.io.BytesWritable; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * Streaming Writer handles delimited input (eg. CSV). + * Delimited input is parsed & reordered to match column order in table + * Uses Lazy Simple Serde to process delimited input + */ +public class DelimitedInputWriter extends AbstractRecordWriter { + private final boolean reorderingNeeded; + private String delimiter; + private char serdeSeparator; + private int[] fieldToColMapping; + private final ArrayList tableColumns; + private AbstractSerDe serde = null; + + static final private Log LOG = LogFactory.getLog(HiveEndPoint.class.getName()); + + /** Constructor. Uses default separator of the LazySimpleSerde + * @param colNamesForFields Column name assignment for input fields. nulls or empty + * strings in the array indicates the fields to be skipped + * @param delimiter input field delimiter + * @param endPoint Hive endpoint + * @throws ConnectionError Problem talking to Hive + * @throws ClassNotFoundException Serde class not found + * @throws SerializationError Serde initialization/interaction failed + * @throws StreamingException Problem acquiring file system path for partition + * @throws InvalidColumn any element in colNamesForFields refers to a non existing column + */ + public DelimitedInputWriter(String[] colNamesForFields, String delimiter, + HiveEndPoint endPoint) + throws ClassNotFoundException, ConnectionError, SerializationError, + InvalidColumn, StreamingException { + this(colNamesForFields, delimiter, endPoint, + (char) LazySimpleSerDe.DefaultSeparators[0]); + } + + /** + * + * @param colNamesForFields Column name assignment for input fields + * @param delimiter input field delimiter + * @param endPoint Hive endpoint + * @param serdeSeparator separator used when encoding data that is fed into the + * LazySimpleSerde. Ensure this separator does not occur + * in the field data + * @throws ConnectionError Problem talking to Hive + * @throws ClassNotFoundException Serde class not found + * @throws SerializationError Serde initialization/interaction failed + * @throws StreamingException Problem acquiring file system path for partition + * @throws InvalidColumn any element in colNamesForFields refers to a non existing column + */ + public DelimitedInputWriter(String[] colNamesForFields, String delimiter, + HiveEndPoint endPoint, char serdeSeparator) + throws ClassNotFoundException, ConnectionError, SerializationError, + InvalidColumn, StreamingException { + super(endPoint); + this.tableColumns = getCols(tbl); + this.serdeSeparator = serdeSeparator; + + this.delimiter = delimiter; + this.fieldToColMapping = getFieldReordering(colNamesForFields, getTableColumns()); + this.reorderingNeeded = isReorderingNeeded(delimiter, getTableColumns()); + LOG.debug("Field reordering needed = " + this.reorderingNeeded + ", for endpoint " + endPoint); + this.serdeSeparator = serdeSeparator; + } + + private boolean isReorderingNeeded(String delimiter, ArrayList tableColumns) { + return !( delimiter.equals(String.valueOf(getSerdeSeparator())) + && areFieldsInColOrder(fieldToColMapping) + && tableColumns.size()>=fieldToColMapping.length ); + } + + private static boolean areFieldsInColOrder(int[] fieldToColMapping) { + for(int i=0; i tableColNames) + throws InvalidColumn { + int[] result = new int[ colNamesForFields.length ]; + for(int i=0; itableColNames.size()) { + throw new InvalidColumn("Number of field names exceeds the number of columns in table"); + } + return result; + } + + // Reorder fields in record based on the order of columns in the table + protected byte[] reorderFields(byte[] record) throws UnsupportedEncodingException { + if(!reorderingNeeded) { + return record; + } + String[] reorderedFields = new String[getTableColumns().size()]; + String decoded = new String(record); + String[] fields = decoded.split(delimiter); + for (int i=0; i getTableColumns() { + return tableColumns; + } + + @Override + public void write(long transactionId, byte[] record) + throws SerializationError, StreamingIOFailure { + try { + byte[] orderedFields = reorderFields(record); + Object encodedRow = encode(orderedFields); + updater.insert(transactionId, encodedRow); + } catch (IOException e) { + throw new StreamingIOFailure("Error writing record in transaction (" + + transactionId + ")", e); + } + } + + @Override + SerDe getSerde() throws SerializationError { + if(serde!=null) { + return serde; + } + serde = createSerde(tbl, conf); + return serde; + } + + private Object encode(byte[] record) throws SerializationError { + try { + BytesWritable blob = new BytesWritable(); + blob.set(record, 0, record.length); + return serde.deserialize(blob); + } catch (SerDeException e) { + throw new SerializationError("Unable to convert byte[] record into Object", e); + } + } + + /** + * Creates LazySimpleSerde + * @return + * @throws SerializationError if serde could not be initialized + * @param tbl + */ + protected LazySimpleSerDe createSerde(Table tbl, HiveConf conf) + throws SerializationError { + try { + Properties tableProps = MetaStoreUtils.getTableMetadata(tbl); + tableProps.setProperty("field.delim", String.valueOf(serdeSeparator)); + LazySimpleSerDe serde = new LazySimpleSerDe(); + serde.initialize(conf, tableProps); + return serde; + } catch (SerDeException e) { + throw new SerializationError("Error initializing serde", e); + } + } + + private ArrayList getCols(Table table) { + List cols = table.getSd().getCols(); + ArrayList colNames = new ArrayList(cols.size()); + for(FieldSchema col : cols) { + colNames.add(col.getName().toLowerCase()); + } + return colNames; + } + + public char getSerdeSeparator() { + return serdeSeparator; + } +} diff --git streaming/src/java/org/apache/hive/streaming/HeartBeatFailure.java streaming/src/java/org/apache/hive/streaming/HeartBeatFailure.java new file mode 100644 index 0000000..109c01d --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/HeartBeatFailure.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +import java.util.Collection; +import java.util.Set; + +public class HeartBeatFailure extends StreamingException { + private Collection abortedTxns; + private Collection nosuchTxns; + + public HeartBeatFailure(Collection abortedTxns, Set nosuchTxns) { + super("Heart beat error. InvalidTxns: " + nosuchTxns + ". AbortedTxns: " + abortedTxns); + this.abortedTxns = abortedTxns; + this.nosuchTxns = nosuchTxns; + } +} diff --git streaming/src/java/org/apache/hive/streaming/HiveEndPoint.java streaming/src/java/org/apache/hive/streaming/HiveEndPoint.java new file mode 100644 index 0000000..f019dfc --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/HiveEndPoint.java @@ -0,0 +1,798 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.LockComponentBuilder; +import org.apache.hadoop.hive.metastore.LockRequestBuilder; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; + +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.thrift.TException; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * Information about the hive end point (i.e. table or partition) to write to. + * A light weight object that does NOT internally hold on to resources such as + * network connections. It can be stored in Hashed containers such as sets and hash tables. + */ +public class HiveEndPoint { + public final String metaStoreUri; + public final String database; + public final String table; + public final ArrayList partitionVals; + public final HiveConf conf; + + + static final private Log LOG = LogFactory.getLog(HiveEndPoint.class.getName()); + + /** + * + * @param metaStoreUri URI of the metastore to connect to eg: thrift://localhost:9083 + * @param database Name of the Hive database + * @param table Name of table to stream to + * @param partitionVals Indicates the specific partition to stream to. Can be null or empty List if streaming to a + * table without partitions. The order of values in this list must correspond exactly to the + * order of partition columns specified during the table creation. E.g. For a table partitioned + * by (continent string, country string), partitionVals could be the list ("Asia", "India"). + */ + public HiveEndPoint(String metaStoreUri + , String database, String table, List partitionVals) { + this.metaStoreUri = metaStoreUri; + if(database==null) { + throw new IllegalArgumentException("Database cannot be null for HiveEndPoint"); + } + this.database = database; + this.table = table; + if(table==null) { + throw new IllegalArgumentException("Table cannot be null for HiveEndPoint"); + } + this.partitionVals = partitionVals==null ? new ArrayList() + : new ArrayList( partitionVals ); + this.conf = createHiveConf(this.getClass(),metaStoreUri); + } + + /** + * Acquire a new connection to MetaStore for streaming + * @param createPartIfNotExists If true, the partition specified in the endpoint + * will be auto created if it does not exist + * @return + * @throws ConnectionError if problem connecting + * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false) + * @throws ImpersonationFailed if not able to impersonate 'proxyUser' + * @throws IOException if there was an I/O error when acquiring connection + * @throws PartitionCreationFailed if failed to create partition + * @throws InterruptedException + */ + public StreamingConnection newConnection(final boolean createPartIfNotExists) + throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed + , ImpersonationFailed , InterruptedException { + return newConnection(null, createPartIfNotExists); + } + + //TODO: make this function public once proxyUser is fully supported + /** + * Acquire a new connection to MetaStore for streaming + * @param proxyUser User on whose behalf all hdfs and hive operations will be + * performed on this connection. Set it to null or empty string + * to connect as user of current process without impersonation. + * @param createPartIfNotExists If true, the partition specified in the endpoint + * will be auto created if it does not exist + * @return + * @throws ConnectionError if problem connecting + * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false) + * @throws ImpersonationFailed if not able to impersonate 'proxyUser' + * @throws IOException if there was an I/O error when acquiring connection + * @throws PartitionCreationFailed if failed to create partition + * @throws InterruptedException + */ + private StreamingConnection newConnection(final String proxyUser, final boolean createPartIfNotExists) + throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed + , ImpersonationFailed , InterruptedException { + if (proxyUser ==null || proxyUser.trim().isEmpty() ) { + return newConnectionImpl(System.getProperty("user.name"), null, createPartIfNotExists); + } + final UserGroupInformation ugi = getUserGroupInfo(proxyUser); + try { + return ugi.doAs ( + new PrivilegedExceptionAction() { + @Override + public StreamingConnection run() + throws ConnectionError, InvalidPartition, InvalidTable + , PartitionCreationFailed { + return newConnectionImpl(proxyUser, ugi, createPartIfNotExists); + } + } + ); + } catch (IOException e) { + throw new ImpersonationFailed("Failed to impersonate '" + proxyUser + + "' when acquiring connection", e); + } + } + + private StreamingConnection newConnectionImpl(String proxyUser, UserGroupInformation ugi, + boolean createPartIfNotExists) + throws ConnectionError, InvalidPartition, InvalidTable + , PartitionCreationFailed { + return new ConnectionImpl(this, proxyUser, ugi, conf, createPartIfNotExists); + } + + private static UserGroupInformation getUserGroupInfo(String proxyUser) + throws ImpersonationFailed { + try { + return UserGroupInformation.createProxyUser( + proxyUser, UserGroupInformation.getLoginUser()); + } catch (IOException e) { + LOG.error("Unable to login as proxy user. Exception follows.", e); + throw new ImpersonationFailed(proxyUser,e); + } + } + + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) { + return false; + } + + HiveEndPoint endPoint = (HiveEndPoint) o; + + if ( database != null + ? !database.equals(endPoint.database) + : endPoint.database != null ) { + return false; + } + if ( metaStoreUri != null + ? !metaStoreUri.equals(endPoint.metaStoreUri) + : endPoint.metaStoreUri != null ) { + return false; + } + if (!partitionVals.equals(endPoint.partitionVals)) { + return false; + } + if (table != null ? !table.equals(endPoint.table) : endPoint.table != null) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result = metaStoreUri != null ? metaStoreUri.hashCode() : 0; + result = 31 * result + (database != null ? database.hashCode() : 0); + result = 31 * result + (table != null ? table.hashCode() : 0); + result = 31 * result + partitionVals.hashCode(); + return result; + } + + @Override + public String toString() { + return "{" + + "metaStoreUri='" + metaStoreUri + '\'' + + ", database='" + database + '\'' + + ", table='" + table + '\'' + + ", partitionVals=" + partitionVals + " }"; + } + + + private static class ConnectionImpl implements StreamingConnection { + private final IMetaStoreClient msClient; + private final HiveEndPoint endPt; + private final String proxyUser; + private final UserGroupInformation ugi; + + /** + * + * @param endPoint + * @param proxyUser can be null + * @param ugi of prody user. If ugi is null, impersonation of proxy user will be disabled + * @param conf + * @param createPart create the partition if it does not exist + * @throws ConnectionError if there is trouble connecting + * @throws InvalidPartition if specified partition does not exist (and createPart=false) + * @throws InvalidTable if specified table does not exist + * @throws PartitionCreationFailed if createPart=true and not able to create partition + */ + private ConnectionImpl(HiveEndPoint endPoint, String proxyUser, UserGroupInformation ugi, + HiveConf conf, boolean createPart) + throws ConnectionError, InvalidPartition, InvalidTable + , PartitionCreationFailed { + this.proxyUser = proxyUser; + this.endPt = endPoint; + this.ugi = ugi; + this.msClient = getMetaStoreClient(endPoint, conf); + if(createPart && !endPoint.partitionVals.isEmpty()) { + createPartitionIfNotExists(endPoint, msClient, conf); + } + } + + /** + * Close connection + */ + @Override + public void close() { + if(ugi==null) { + msClient.close(); + return; + } + try { + ugi.doAs ( + new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + msClient.close(); + return null; + } + } ); + } catch (IOException e) { + LOG.error("Error closing connection to " + endPt, e); + } catch (InterruptedException e) { + LOG.error("Interrupted when closing connection to " + endPt, e); + } + } + + + /** + * Acquires a new batch of transactions from Hive. + * + * @param numTransactions is a hint from client indicating how many transactions client needs. + * @param recordWriter Used to write record. The same writer instance can + * be shared with another TransactionBatch (to the same endpoint) + * only after the first TransactionBatch has been closed. + * Writer will be closed when the TransactionBatch is closed. + * @return + * @throws StreamingIOFailure if failed to create new RecordUpdater for batch + * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch + * @throws ImpersonationFailed failed to run command as proxyUser + * @throws InterruptedException + */ + public TransactionBatch fetchTransactionBatch(final int numTransactions, + final RecordWriter recordWriter) + throws StreamingException, TransactionBatchUnAvailable, ImpersonationFailed, InterruptedException { + if(ugi==null) { + return fetchTransactionBatchImpl(numTransactions, recordWriter); + } + try { + return ugi.doAs ( + new PrivilegedExceptionAction() { + @Override + public TransactionBatch run() throws StreamingException { + return fetchTransactionBatchImpl(numTransactions, recordWriter); + } + } + ); + } catch (IOException e) { + throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser + + "' when acquiring Transaction Batch on endPoint " + endPt, e); + } + } + + private TransactionBatch fetchTransactionBatchImpl(int numTransactions, + RecordWriter recordWriter) + throws StreamingException, TransactionBatchUnAvailable { + return new TransactionBatchImpl(proxyUser, ugi, endPt, numTransactions, msClient, recordWriter); + } + + + private static void createPartitionIfNotExists(HiveEndPoint ep, + IMetaStoreClient msClient, HiveConf conf) + throws InvalidTable, PartitionCreationFailed { + if(ep.partitionVals.isEmpty()) { + return; + } + SessionState state = SessionState.start(new CliSessionState(conf)); + Driver driver = new Driver(conf); + + try { + if(LOG.isDebugEnabled()) { + LOG.debug("Attempting to create partition (if not existent) " + ep); + } + + List partKeys = msClient.getTable(ep.database, ep.table) + .getPartitionKeys(); + runDDL(driver, "use " + ep.database); + String query = "alter table " + ep.table + " add if not exists partition " + + partSpecStr(partKeys, ep.partitionVals); + runDDL(driver, query); + } catch (MetaException e) { + LOG.error("Failed to create partition : " + ep, e); + throw new PartitionCreationFailed(ep, e); + } catch (NoSuchObjectException e) { + LOG.error("Failed to create partition : " + ep, e); + throw new InvalidTable(ep.database, ep.table); + } catch (TException e) { + LOG.error("Failed to create partition : " + ep, e); + throw new PartitionCreationFailed(ep, e); + } catch (QueryFailedException e) { + LOG.error("Failed to create partition : " + ep, e); + throw new PartitionCreationFailed(ep, e); + } finally { + driver.close(); + try { + state.close(); + } catch (IOException e) { + LOG.warn("Error closing SessionState used to run Hive DDL."); + } + } + } + + private static boolean runDDL(Driver driver, String sql) throws QueryFailedException { + int retryCount = 1; // # of times to retry if first attempt fails + for(int attempt=0; attempt<=retryCount; ++attempt) { + try { + if(LOG.isDebugEnabled()) { + LOG.debug("Running Hive Query: "+ sql); + } + driver.run(sql); + return true; + } catch (CommandNeedRetryException e) { + if(attempt==retryCount) { + throw new QueryFailedException(sql, e); + } + continue; + } + } // for + return false; + } + + private static String partSpecStr(List partKeys, ArrayList partVals) { + if(partKeys.size()!=partVals.size()) { + throw new IllegalArgumentException("Partition values:" + partVals + ", does not match the partition Keys in table :" + partKeys ); + } + StringBuffer buff = new StringBuffer(partKeys.size()*20); + buff.append(" ( "); + int i=0; + for(FieldSchema schema : partKeys) { + buff.append(schema.getName()); + buff.append("='"); + buff.append(partVals.get(i)); + buff.append("'"); + if(i!=partKeys.size()-1) { + buff.append(","); + } + ++i; + } + buff.append(" )"); + return buff.toString(); + } + + private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveConf conf) + throws ConnectionError { + + if(endPoint.metaStoreUri!= null) { + conf.setVar(HiveConf.ConfVars.METASTOREURIS, endPoint.metaStoreUri); + } + + try { + return new HiveMetaStoreClient(conf); + } catch (MetaException e) { + throw new ConnectionError("Error connecting to Hive Metastore URI: " + + endPoint.metaStoreUri, e); + } + } + + + } // class ConnectionImpl + + private static class TransactionBatchImpl implements TransactionBatch { + private final String proxyUser; + private final UserGroupInformation ugi; + private final HiveEndPoint endPt; + private final IMetaStoreClient msClient; + private final RecordWriter recordWriter; + private final List txnIds; + + private int currentTxnIndex; + private final String partNameForLock; + + private TxnState state; + private LockRequest lockRequest = null; + + /** + * Represents a batch of transactions acquired from MetaStore + * + * @param proxyUser + * @param ugi + * @param endPt + * @param numTxns + * @param msClient + * @param recordWriter + * @throws StreamingException if failed to create new RecordUpdater for batch + * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch + */ + private TransactionBatchImpl(String proxyUser, UserGroupInformation ugi, HiveEndPoint endPt + , int numTxns, IMetaStoreClient msClient, RecordWriter recordWriter) + throws StreamingException, TransactionBatchUnAvailable { + try { + if( endPt.partitionVals!=null && !endPt.partitionVals.isEmpty() ) { + Table tableObj = msClient.getTable(endPt.database, endPt.table); + List partKeys = tableObj.getPartitionKeys(); + partNameForLock = Warehouse.makePartName(partKeys, endPt.partitionVals); + } else { + partNameForLock = null; + } + this.proxyUser = proxyUser; + this.ugi = ugi; + this.endPt = endPt; + this.msClient = msClient; + this.recordWriter = recordWriter; + this.txnIds = msClient.openTxns(proxyUser, numTxns).getTxn_ids(); + this.currentTxnIndex = -1; + this.state = TxnState.INACTIVE; + recordWriter.newBatch(txnIds.get(0), txnIds.get(txnIds.size()-1)); + } catch (TException e) { + throw new TransactionBatchUnAvailable(endPt, e); + } + } + + @Override + public String toString() { + if(txnIds==null || txnIds.isEmpty()) { + return "{}"; + } + return "TxnIds=[" + txnIds.get(0) + ".." + txnIds.get(txnIds.size()-1) + "] on endPoint= " + endPt; + } + + /** + * Activate the next available transaction in the current transaction batch + * @throws TransactionError failed to switch to next transaction + */ + @Override + public void beginNextTransaction() throws TransactionError, ImpersonationFailed, + InterruptedException { + if(ugi==null) { + beginNextTransactionImpl(); + return; + } + try { + ugi.doAs ( + new PrivilegedExceptionAction() { + @Override + public Void run() throws TransactionError { + beginNextTransactionImpl(); + return null; + } + } + ); + } catch (IOException e) { + throw new ImpersonationFailed("Failed impersonating proxyUser '" + proxyUser + + "' when switch to next Transaction for endPoint :" + endPt, e); + } + } + + private void beginNextTransactionImpl() throws TransactionError { + if( currentTxnIndex >= txnIds.size() ) + throw new InvalidTrasactionState("No more transactions available in" + + " current batch for end point : " + endPt); + ++currentTxnIndex; + lockRequest = createLockRequest(endPt, partNameForLock, proxyUser, getCurrentTxnId()); + try { + LockResponse res = msClient.lock(lockRequest); + if(res.getState() != LockState.ACQUIRED) { + throw new TransactionError("Unable to acquire lock on " + endPt); + } + } catch (TException e) { + throw new TransactionError("Unable to acquire lock on " + endPt, e); + } + + state = TxnState.OPEN; + } + + /** + * Get Id of currently open transaction + * @return + */ + @Override + public Long getCurrentTxnId() { + return txnIds.get(currentTxnIndex); + } + + /** + * get state of current tramsaction + * @return + */ + @Override + public TxnState getCurrentTransactionState() { + return state; + } + + /** + * Remaining transactions are the ones that are not committed or aborted or active. + * Active transaction is not considered part of remaining txns. + * @return number of transactions remaining this batch. + */ + @Override + public int remainingTransactions() { + if(currentTxnIndex>=0) { + return txnIds.size() - currentTxnIndex -1; + } + return txnIds.size(); + } + + + /** + * Write record using RecordWriter + * @param record the data to be written + * @throws StreamingIOFailure I/O failure + * @throws SerializationError serialization error + * @throws ImpersonationFailed error writing on behalf of proxyUser + * @throws InterruptedException + */ + @Override + public void write(final byte[] record) + throws StreamingException, InterruptedException, + ImpersonationFailed { + if(ugi==null) { + writeImpl(record); + return; + } + try { + ugi.doAs ( + new PrivilegedExceptionAction() { + @Override + public Void run() throws StreamingException { + writeImpl(record); + return null; + } + } + ); + } catch (IOException e) { + throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser + + "' when writing to endPoint :" + endPt + ". Transaction Id: " + + getCurrentTxnId(), e); + } + } + + public void writeImpl(byte[] record) + throws StreamingException { + recordWriter.write(getCurrentTxnId(), record); + } + + + /** + * Write records using RecordWriter + * @param records collection of rows to be written + * @throws StreamingException serialization error + * @throws ImpersonationFailed error writing on behalf of proxyUser + * @throws InterruptedException + */ + @Override + public void write(final Collection records) + throws StreamingException, InterruptedException, + ImpersonationFailed { + if(ugi==null) { + writeImpl(records); + return; + } + try { + ugi.doAs ( + new PrivilegedExceptionAction() { + @Override + public Void run() throws StreamingException { + writeImpl(records); + return null; + } + } + ); + } catch (IOException e) { + throw new ImpersonationFailed("Failed impersonating proxyUser '" + proxyUser + + "' when writing to endPoint :" + endPt + ". Transaction Id: " + + getCurrentTxnId(), e); + } + } + + private void writeImpl(Collection records) + throws StreamingException { + for(byte[] record : records) { + writeImpl(record); + } + } + + + /** + * Commit the currently open transaction + * @throws TransactionError + * @throws StreamingIOFailure if flushing records failed + * @throws ImpersonationFailed if + * @throws InterruptedException + */ + @Override + public void commit() throws TransactionError, StreamingException, + ImpersonationFailed, InterruptedException { + if(ugi==null) { + commitImpl(); + return; + } + try { + ugi.doAs ( + new PrivilegedExceptionAction() { + @Override + public Void run() throws StreamingException { + commitImpl(); + return null; + } + } + ); + } catch (IOException e) { + throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser + + "' when committing Txn on endPoint :" + endPt + ". Transaction Id: " + + getCurrentTxnId(), e); + } + + } + + private void commitImpl() throws TransactionError, StreamingException { + try { + recordWriter.flush(); + msClient.commitTxn(txnIds.get(currentTxnIndex)); + state = TxnState.COMMITTED; + } catch (NoSuchTxnException e) { + throw new TransactionError("Invalid transaction id : " + + getCurrentTxnId(), e); + } catch (TxnAbortedException e) { + throw new TransactionError("Aborted transaction cannot be committed" + , e); + } catch (TException e) { + throw new TransactionError("Unable to commit transaction" + + getCurrentTxnId(), e); + } + } + + /** + * Abort the currently open transaction + * @throws TransactionError + */ + @Override + public void abort() throws TransactionError, StreamingException + , ImpersonationFailed, InterruptedException { + if(ugi==null) { + abortImpl(); + return; + } + try { + ugi.doAs ( + new PrivilegedExceptionAction() { + @Override + public Void run() throws StreamingException { + abortImpl(); + return null; + } + } + ); + } catch (IOException e) { + throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser + + "' when aborting Txn on endPoint :" + endPt + ". Transaction Id: " + + getCurrentTxnId(), e); + } + } + + private void abortImpl() throws TransactionError, StreamingException { + try { + recordWriter.clear(); + msClient.rollbackTxn(getCurrentTxnId()); + state = TxnState.ABORTED; + } catch (NoSuchTxnException e) { + throw new TransactionError("Unable to abort invalid transaction id : " + + getCurrentTxnId(), e); + } catch (TException e) { + throw new TransactionError("Unable to abort transaction id : " + + getCurrentTxnId(), e); + } + } + + @Override + public void heartbeat() throws StreamingException, HeartBeatFailure { + Long first = txnIds.get(currentTxnIndex); + Long last = txnIds.get(txnIds.size()-1); + try { + HeartbeatTxnRangeResponse resp = msClient.heartbeatTxnRange(first, last); + if(!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) { + throw new HeartBeatFailure(resp.getAborted(), resp.getNosuch()); + } + } catch (TException e) { + throw new StreamingException("Failure to heartbeat on ids (" + first + ".." + + last + ") on end point : " + endPt ); + } + } + + /** + * Close the TransactionBatch + * @throws StreamingIOFailure I/O failure when closing transaction batch + */ + @Override + public void close() throws StreamingException, ImpersonationFailed, InterruptedException { + if(ugi==null) { + state = TxnState.INACTIVE; + recordWriter.closeBatch(); + return; + } + try { + ugi.doAs ( + new PrivilegedExceptionAction() { + @Override + public Void run() throws StreamingException { + state = TxnState.INACTIVE; + recordWriter.closeBatch(); + return null; + } + } + ); + } catch (IOException e) { + throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser + + "' when closing Txn Batch on endPoint :" + endPt, e); + } + } + + private static LockRequest createLockRequest(final HiveEndPoint hiveEndPoint, + String partNameForLock, String user, long txnId) { + LockRequestBuilder rqstBuilder = new LockRequestBuilder(); + rqstBuilder.setUser(user); + rqstBuilder.setTransactionId(txnId); + + LockComponentBuilder lockCompBuilder = new LockComponentBuilder() + .setDbName(hiveEndPoint.database) + .setTableName(hiveEndPoint.table) + .setShared(); + if(partNameForLock!=null && !partNameForLock.isEmpty() ) { + lockCompBuilder.setPartitionName(partNameForLock); + } + rqstBuilder.addLockComponent(lockCompBuilder.build()); + + return rqstBuilder.build(); + } + } // class TransactionBatchImpl + + static HiveConf createHiveConf(Class clazz, String metaStoreUri) { + HiveConf conf = new HiveConf(clazz); + conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, + "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true); + conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true); + if(metaStoreUri!= null) { + conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri); + } + return conf; + } + +} // class HiveEndPoint diff --git streaming/src/java/org/apache/hive/streaming/ImpersonationFailed.java streaming/src/java/org/apache/hive/streaming/ImpersonationFailed.java new file mode 100644 index 0000000..6526124 --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/ImpersonationFailed.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +public class ImpersonationFailed extends StreamingException { + public ImpersonationFailed(String username, Exception e) { + super("Failed to impersonate user " + username, e); + } +} diff --git streaming/src/java/org/apache/hive/streaming/InvalidColumn.java streaming/src/java/org/apache/hive/streaming/InvalidColumn.java new file mode 100644 index 0000000..08d8fdf --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/InvalidColumn.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +public class InvalidColumn extends StreamingException { + + public InvalidColumn(String msg) { + super(msg); + } +} diff --git streaming/src/java/org/apache/hive/streaming/InvalidPartition.java streaming/src/java/org/apache/hive/streaming/InvalidPartition.java new file mode 100644 index 0000000..94f6b6e --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/InvalidPartition.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +public class InvalidPartition extends StreamingException { + + public InvalidPartition(String partitionName, String partitionValue) { + super("Invalid partition: Name=" + partitionName + + ", Value=" + partitionValue); + } + +} diff --git streaming/src/java/org/apache/hive/streaming/InvalidTable.java streaming/src/java/org/apache/hive/streaming/InvalidTable.java new file mode 100644 index 0000000..d4552ec --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/InvalidTable.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +public class InvalidTable extends StreamingException { + + private static String makeMsg(String db, String table) { + return "Invalid table db:" + db + ", table:" + table; + } + + public InvalidTable(String db, String table, Exception cause) { + super(makeMsg(db,table), cause); + } + + public InvalidTable(String db, String table) { + super(makeMsg(db,table), null); + } +} diff --git streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java new file mode 100644 index 0000000..ed5ccfc --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +public class InvalidTrasactionState extends TransactionError { + public InvalidTrasactionState(String msg) { + super(msg); + } + + public InvalidTrasactionState(String msg, Exception e) { + super(msg,e); + } +} diff --git streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java new file mode 100644 index 0000000..82fde58 --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +public class PartitionCreationFailed extends StreamingException { + public PartitionCreationFailed(HiveEndPoint endPoint, Exception cause) { + super("Failed to create partition " + endPoint, cause); + } + + public PartitionCreationFailed(String msg, Exception cause) { + super(msg, cause); + } +} diff --git streaming/src/java/org/apache/hive/streaming/QueryFailedException.java streaming/src/java/org/apache/hive/streaming/QueryFailedException.java new file mode 100644 index 0000000..7cbde61 --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/QueryFailedException.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +import org.apache.hadoop.hive.ql.CommandNeedRetryException; + +public class QueryFailedException extends StreamingException { + String query; + public QueryFailedException(String query, CommandNeedRetryException e) { + super("Query failed: " + query + ". Due to :" + e.getMessage(), e); + this.query = query; + } +} diff --git streaming/src/java/org/apache/hive/streaming/RecordWriter.java streaming/src/java/org/apache/hive/streaming/RecordWriter.java new file mode 100644 index 0000000..0edb50b --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/RecordWriter.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + + +public interface RecordWriter { + + /** Write using RecordUpdater + * + * @param transactionId the ID of the Txn in which the write occurs + * @param record the record to be written + */ + public void write(long transactionId, byte[] record) throws StreamingException; + + /** Flush records from buffer. Invoked by TransactionBatch.commit() */ + public void flush() throws StreamingException; + + /** Clear bufferred writes. Invoked by TransactionBatch.abort() */ + public void clear() throws StreamingException; + + /** Acquire a new RecordUpdater. Invoked when + * StreamingConnection.fetchTransactionBatch() is called */ + public void newBatch(Long minTxnId, Long maxTxnID) throws StreamingException; + + /** Close the RecordUpdater. Invoked by TransactionBatch.close() */ + public void closeBatch() throws StreamingException; +} diff --git streaming/src/java/org/apache/hive/streaming/SerializationError.java streaming/src/java/org/apache/hive/streaming/SerializationError.java new file mode 100644 index 0000000..6844aca --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/SerializationError.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + + +public class SerializationError extends StreamingException { + public SerializationError(String msg, Exception e) { + super(msg,e); + } +} diff --git streaming/src/java/org/apache/hive/streaming/StreamingConnection.java streaming/src/java/org/apache/hive/streaming/StreamingConnection.java new file mode 100644 index 0000000..8480ea9 --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/StreamingConnection.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +/** + * Represents a connection to a HiveEndPoint. Used to acquire transaction batches. + */ +public interface StreamingConnection { + + /** + * Acquires a new batch of transactions from Hive. + + * @param numTransactionsHint is a hint from client indicating how many transactions client needs. + * @param writer Used to write record. The same writer instance can + * be shared with another TransactionBatch (to the same endpoint) + * only after the first TransactionBatch has been closed. + * Writer will be closed when the TransactionBatch is closed. + * @return + * @throws ConnectionError + * @throws InvalidPartition + * @throws StreamingException + * @return a batch of transactions + */ + public TransactionBatch fetchTransactionBatch(int numTransactionsHint, + RecordWriter writer) + throws ConnectionError, StreamingException, InterruptedException; + + /** + * Close connection + */ + public void close(); + +} diff --git streaming/src/java/org/apache/hive/streaming/StreamingException.java streaming/src/java/org/apache/hive/streaming/StreamingException.java new file mode 100644 index 0000000..9486d6d --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/StreamingException.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +public class StreamingException extends Exception { + public StreamingException(String msg, Exception cause) { + super(msg, cause); + } + public StreamingException(String msg) { + super(msg); + } +} diff --git streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java new file mode 100644 index 0000000..fb09488 --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + + +public class StreamingIOFailure extends StreamingException { + + public StreamingIOFailure(String msg, Exception cause) { + super(msg, cause); + } + + public StreamingIOFailure(String msg) { + super(msg); + } +} diff --git streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java new file mode 100644 index 0000000..271f969 --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.io.Text; +import org.apache.hive.hcatalog.data.JsonSerDe; + +import java.io.IOException; +import java.util.Properties; + +/** + * Streaming Writer handles utf8 encoded Json (Strict syntax). + * Uses org.apache.hive.hcatalog.data.JsonSerDe to process Json input + */ +public class StrictJsonWriter extends AbstractRecordWriter { + private JsonSerDe serde; + + /** + * + * @param endPoint the end point to write to + * @throws ConnectionError + * @throws SerializationError + * @throws StreamingException + */ + public StrictJsonWriter(HiveEndPoint endPoint) + throws ConnectionError, SerializationError, StreamingException { + super(endPoint); + } + + @Override + SerDe getSerde() throws SerializationError { + if(serde!=null) { + return serde; + } + serde = createSerde(tbl, conf); + return serde; + } + + @Override + public void write(long transactionId, byte[] record) + throws StreamingIOFailure, SerializationError { + try { + Object encodedRow = encode(record); + updater.insert(transactionId, encodedRow); + } catch (IOException e) { + throw new StreamingIOFailure("Error writing record in transaction(" + + transactionId + ")", e); + } + + } + + /** + * Creates JsonSerDe + * @param tbl used to create serde + * @param conf used to create serde + * @return + * @throws SerializationError if serde could not be initialized + */ + private static JsonSerDe createSerde(Table tbl, HiveConf conf) + throws SerializationError { + try { + Properties tableProps = MetaStoreUtils.getTableMetadata(tbl); + JsonSerDe serde = new JsonSerDe(); + serde.initialize(conf, tableProps); + return serde; + } catch (SerDeException e) { + throw new SerializationError("Error initializing serde " + JsonSerDe.class.getName(), e); + } + } + + /** + * Encode Utf8 encoded string bytes using JsonSerde + * @param utf8StrRecord + * @return The encoded object + * @throws SerializationError + */ + private Object encode(byte[] utf8StrRecord) throws SerializationError { + try { + Text blob = new Text(utf8StrRecord); + return serde.deserialize(blob); + } catch (SerDeException e) { + throw new SerializationError("Unable to convert byte[] record into Object", e); + } + } + +} diff --git streaming/src/java/org/apache/hive/streaming/TransactionBatch.java streaming/src/java/org/apache/hive/streaming/TransactionBatch.java new file mode 100644 index 0000000..e9e42fa --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/TransactionBatch.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.thrift.TException; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +/** + * Represents a set of Transactions returned by Hive. Supports opening, writing to + * and commiting/aborting each transaction. The interface is designed to ensure + * transactions in a batch are used up sequentially. Multiple transaction batches can be + * used (initialized with separate RecordWriters) for concurrent streaming + * + */ +public interface TransactionBatch { + public enum TxnState {INACTIVE, OPEN, COMMITTED, ABORTED } + + /** + * Activate the next available transaction in the current transaction batch + * @throws StreamingException + */ + public void beginNextTransaction() throws StreamingException, InterruptedException; + + /** + * Get Id of currently open transaction + * @return transaction id + */ + public Long getCurrentTxnId(); + + /** + * get state of current transaction + */ + public TxnState getCurrentTransactionState(); + + /** + * Commit the currently open transaction + * @throws StreamingException + */ + public void commit() throws StreamingException, InterruptedException; + + /** + * Abort the currently open transaction + * @throws StreamingException + */ + public void abort() throws StreamingException, InterruptedException; + + /** + * Remaining transactions are the ones that are not committed or aborted or open. + * Currently open transaction is not considered part of remaining txns. + * @return number of transactions remaining this batch. + */ + public int remainingTransactions(); + + + /** + * Write record using RecordWriter + * @param record the data to be written + * @throws ConnectionError + * @throws IOException + * @throws StreamingException + */ + public void write(byte[] record) throws StreamingException, InterruptedException; + + /** + * Write records using RecordWriter + * @param records collection of rows to be written + * @throws ConnectionError + * @throws IOException + * @throws StreamingException + */ + public void write(Collection records) throws StreamingException, InterruptedException; + + + /** + * Issues a heartbeat to hive metastore on the current and remaining txn ids + * to keep them from expiring + * @throws StreamingException + * @throws HeartBeatFailure + */ + public void heartbeat() throws StreamingException, HeartBeatFailure; + + /** + * Close the TransactionBatch + * @throws StreamingException + */ + public void close() throws StreamingException, InterruptedException; +} diff --git streaming/src/java/org/apache/hive/streaming/TransactionBatchUnAvailable.java streaming/src/java/org/apache/hive/streaming/TransactionBatchUnAvailable.java new file mode 100644 index 0000000..01883da --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/TransactionBatchUnAvailable.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +public class TransactionBatchUnAvailable extends StreamingException { + public TransactionBatchUnAvailable(HiveEndPoint ep, Exception e) { + super("Unable to acquire transaction batch on end point: " + ep, e); + } +} diff --git streaming/src/java/org/apache/hive/streaming/TransactionError.java streaming/src/java/org/apache/hive/streaming/TransactionError.java new file mode 100644 index 0000000..1acb4cc --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/TransactionError.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +public class TransactionError extends StreamingException { + public TransactionError(String msg, Exception e) { + super(msg, e); + } + + public TransactionError(String msg) { + super(msg); + } +} diff --git streaming/src/test/org/apache/hive/streaming/StreamingIntegrationTester.java streaming/src/test/org/apache/hive/streaming/StreamingIntegrationTester.java new file mode 100644 index 0000000..5864783 --- /dev/null +++ streaming/src/test/org/apache/hive/streaming/StreamingIntegrationTester.java @@ -0,0 +1,353 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.streaming; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.Parser; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.LogUtils; +import org.apache.hadoop.util.StringUtils; + +import java.util.Arrays; +import java.util.Random; + +/** + * A stand alone utility to write data into the streaming ingest interface. + */ +public class StreamingIntegrationTester { + + static final private Log LOG = LogFactory.getLog(StreamingIntegrationTester.class.getName()); + + public static void main(String[] args) { + + try { + LogUtils.initHiveLog4j(); + } catch (LogUtils.LogInitializationException e) { + System.err.println("Unable to initialize log4j " + StringUtils.stringifyException(e)); + System.exit(-1); + } + + Options options = new Options(); + + options.addOption(OptionBuilder + .hasArg() + .withArgName("abort-pct") + .withDescription("Percentage of transactions to abort, defaults to 5") + .withLongOpt("abortpct") + .create('a')); + + options.addOption(OptionBuilder + .hasArgs() + .withArgName("column-names") + .withDescription("column names of table to write to") + .withLongOpt("columns") + .withValueSeparator(',') + .isRequired() + .create('c')); + + options.addOption(OptionBuilder + .hasArg() + .withArgName("database") + .withDescription("Database of table to write to") + .withLongOpt("database") + .isRequired() + .create('d')); + +// options.addOption(OptionBuilder +// .hasArg() +// .withArgName("user") +// .withDescription("User to impersonate") +// .withLongOpt("user") +// .create('u')); + + options.addOption(OptionBuilder + .hasArg() + .withArgName("frequency") + .withDescription("How often to commit a transaction, in seconds, defaults to 1") + .withLongOpt("frequency") + .create('f')); + + options.addOption(OptionBuilder + .hasArg() + .withArgName("iterations") + .withDescription("Number of batches to write, defaults to 10") + .withLongOpt("num-batches") + .create('i')); + + options.addOption(OptionBuilder + .hasArg() + .withArgName("metastore-uri") + .withDescription("URI of Hive metastore") + .withLongOpt("metastore-uri") + .isRequired() + .create('m')); + + options.addOption(OptionBuilder + .hasArg() + .withArgName("num_transactions") + .withDescription("Number of transactions per batch, defaults to 100") + .withLongOpt("num-txns") + .create('n')); + + options.addOption(OptionBuilder + .hasArgs() + .withArgName("partition-values") + .withDescription("partition values, must be provided in order of partition columns, " + + "if not provided table is assumed to not be partitioned") + .withLongOpt("partition") + .withValueSeparator(',') + .create('p')); + + options.addOption(OptionBuilder + .hasArg() + .withArgName("records-per-transaction") + .withDescription("records to write in each transaction, defaults to 100") + .withLongOpt("records-per-txn") + .withValueSeparator(',') + .create('r')); + + options.addOption(OptionBuilder + .hasArgs() + .withArgName("column-types") + .withDescription("column types, valid values are string, int, float, decimal, date, " + + "datetime") + .withLongOpt("schema") + .withValueSeparator(',') + .isRequired() + .create('s')); + + options.addOption(OptionBuilder + .hasArg() + .withArgName("table") + .withDescription("Table to write to") + .withLongOpt("table") + .isRequired() + .create('t')); + + options.addOption(OptionBuilder + .hasArg() + .withArgName("num-writers") + .withDescription("Number of writers to create, defaults to 2") + .withLongOpt("writers") + .create('w')); + + options.addOption(OptionBuilder + .hasArg(false) + .withArgName("pause") + .withDescription("pause for keyboard input on every commit and batch close. disabled by default") + .withLongOpt("pause") + .create('x')); + + + Parser parser = new GnuParser(); + CommandLine cmdline = null; + try { + cmdline = parser.parse(options, args); + } catch (ParseException e) { + usage(options); + } + + boolean pause = cmdline.hasOption('x'); +// String user = cmdline.getOptionValue('u'); + String db = cmdline.getOptionValue('d'); + String table = cmdline.getOptionValue('t'); + String uri = cmdline.getOptionValue('m'); + int txnsPerBatch = Integer.valueOf(cmdline.getOptionValue('n', "100")); + int writers = Integer.valueOf(cmdline.getOptionValue('w', "2")); + int batches = Integer.valueOf(cmdline.getOptionValue('i', "10")); + int recordsPerTxn = Integer.valueOf(cmdline.getOptionValue('r', "100")); + int frequency = Integer.valueOf(cmdline.getOptionValue('f', "1")); + int ap = Integer.valueOf(cmdline.getOptionValue('a', "5")); + float abortPct = ((float)ap) / 100.0f; + String[] partVals = cmdline.getOptionValues('p'); + String[] cols = cmdline.getOptionValues('c'); + String[] types = cmdline.getOptionValues('s'); + + StreamingIntegrationTester sit = new StreamingIntegrationTester(db, table, uri, + txnsPerBatch, writers, batches, recordsPerTxn, frequency, abortPct, partVals, cols, types, pause); + sit.go(); + } + + static void usage(Options options) { + HelpFormatter hf = new HelpFormatter(); + hf.printHelp(HelpFormatter.DEFAULT_WIDTH, "sit [options]", "Usage: ", options, ""); + System.exit(-1); + } + + private String user; + private String db; + private String table; + private String uri; + private int txnsPerBatch; + private int writers; + private int batches; + private int recordsPerTxn; + private int frequency; + private float abortPct; + private String[] partVals; + private String[] cols; + private String[] types; + private boolean pause; + + + private StreamingIntegrationTester(String db, String table, String uri, int txnsPerBatch, + int writers, int batches, int recordsPerTxn, + int frequency, float abortPct, String[] partVals, + String[] cols, String[] types, boolean pause) { + this.db = db; + this.table = table; + this.uri = uri; + this.txnsPerBatch = txnsPerBatch; + this.writers = writers; + this.batches = batches; + this.recordsPerTxn = recordsPerTxn; + this.frequency = frequency; + this.abortPct = abortPct; + this.partVals = partVals; + this.cols = cols; + this.types = types; + this.pause = pause; + } + + private void go() { + HiveEndPoint endPoint = null; + try { + if(partVals == null) { + endPoint = new HiveEndPoint(uri, db, table, null); + } else { + endPoint = new HiveEndPoint(uri, db, table, Arrays.asList(partVals)); + } + + for (int i = 0; i < writers; i++) { + Writer w = new Writer(endPoint, user, i, txnsPerBatch, batches, recordsPerTxn, frequency, abortPct, + cols, types, pause); + w.start(); + } + + } catch (Throwable t) { + System.err.println("Caught exception while testing: " + StringUtils.stringifyException(t)); + } + } + + private static class Writer extends Thread { + private HiveEndPoint endPoint; + private final String user; + private int txnsPerBatch; + private int batches; + private int writerNumber; + private int recordsPerTxn; + private int frequency; + private float abortPct; + private String[] cols; + private String[] types; + private boolean pause; + private Random rand; + + Writer(HiveEndPoint endPoint, String user, int writerNumber, int txnsPerBatch, int batches, + int recordsPerTxn, int frequency, float abortPct, String[] cols, String[] types, boolean pause) { + this.endPoint = endPoint; + this.user = user; + this.txnsPerBatch = txnsPerBatch; + this.batches = batches; + this.writerNumber = writerNumber; + this.recordsPerTxn = recordsPerTxn; + this.frequency = frequency; + this.abortPct = abortPct; + this.cols = cols; + this.types = types; + this.pause = pause; + rand = new Random(); + } + + @Override + public void run() { + StreamingConnection conn = null; + try { + conn = endPoint.newConnection(true); + RecordWriter writer = new DelimitedInputWriter(cols, ",", endPoint); + + long start = System.currentTimeMillis(); + for (int i = 0; i < batches; i++) { + LOG.info("Starting batch " + i); + TransactionBatch batch = conn.fetchTransactionBatch(txnsPerBatch, writer); + try { + while (batch.remainingTransactions() > 0) { + batch.beginNextTransaction(); + for (int j = 0; j < recordsPerTxn; j++) { + batch.write(generateRecord(cols, types)); + } + if (rand.nextFloat() < abortPct) batch.abort(); + else + batch.commit(); + if(pause) { + System.out.println("Writer " + writerNumber + " committed... press Enter to continue. " + Thread.currentThread().getId()); + System.in.read(); + } + } + long end = System.currentTimeMillis(); + if (end - start < frequency) Thread.sleep(frequency - (end - start)); + } finally { + batch.close(); + if(pause) { + System.out.println("Writer " + writerNumber + " has closed a Batch.. press Enter to continue. " + Thread.currentThread().getId()); + System.in.read(); + } + } + } + } catch (Throwable t) { + System.err.println("Writer number " + writerNumber + " caught exception while testing: " + + StringUtils.stringifyException(t)); + } finally { + if(conn!=null) conn.close(); + } + } + + private byte[] generateRecord(String[] cols, String[] types) { + // TODO make it so I can randomize the column order + + StringBuilder buf = new StringBuilder(); + for (int i = 0; i < types.length; i++) { + buf.append(generateColumn(types[i])); + buf.append(","); + } + return buf.toString().getBytes(); + } + + private String generateColumn(String type) { + if ("string".equals(type.toLowerCase())) { + return "When that Aprilis with his showers swoot"; + } else if (type.toLowerCase().startsWith("int")) { + return "42"; + } else if (type.toLowerCase().startsWith("dec") || type.toLowerCase().equals("float")) { + return "3.141592654"; + } else if (type.toLowerCase().equals("datetime")) { + return "2014-03-07 15:33:22"; + } else if (type.toLowerCase().equals("date")) { + return "1955-11-12"; + } else { + throw new RuntimeException("Sorry, I don't know the type " + type); + } + } + } +} diff --git streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java new file mode 100644 index 0000000..b90f95d --- /dev/null +++ streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +import com.google.common.collect.Lists; +import junit.framework.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; + +public class TestDelimitedInputWriter { + @Test + public void testFieldReordering() throws Exception { + + ArrayList colNames = Lists.newArrayList(new String[]{"col1", "col2", "col3", "col4", "col5"}); + {//1) test dropping fields - first middle & last + String[] fieldNames = {null, "col2", null, "col4", null}; + int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames); + Assert.assertTrue(Arrays.equals(mapping, new int[]{-1, 1, -1, 3, -1})); + } + + {//2) test reordering + String[] fieldNames = {"col5", "col4", "col3", "col2", "col1"}; + int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames); + Assert.assertTrue( Arrays.equals(mapping, new int[]{4,3,2,1,0}) ); + } + + {//3) test bad field names + String[] fieldNames = {"xyz", "abc", "col3", "col4", "as"}; + try { + DelimitedInputWriter.getFieldReordering(fieldNames, colNames); + Assert.fail(); + } catch (InvalidColumn e) { + // should throw + } + } + + {//4) test few field names + String[] fieldNames = {"col3", "col4"}; + int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames); + Assert.assertTrue( Arrays.equals(mapping, new int[]{2,3}) ); + } + + {//5) test extra field names + String[] fieldNames = {"col5", "col4", "col3", "col2", "col1", "col1"}; + try { + DelimitedInputWriter.getFieldReordering(fieldNames, colNames); + Assert.fail(); + } catch (InvalidColumn e) { + //show throw + } + } + } +} diff --git streaming/src/test/org/apache/hive/streaming/TestStreaming.java streaming/src/test/org/apache/hive/streaming/TestStreaming.java new file mode 100644 index 0000000..c5325c3 --- /dev/null +++ streaming/src/test/org/apache/hive/streaming/TestStreaming.java @@ -0,0 +1,823 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +import junit.framework.Assert; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcSerde; +import org.apache.hadoop.hive.ql.io.orc.OrcStruct; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Shell; +import org.apache.thrift.TException; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +public class TestStreaming { + + public static class RawFileSystem extends RawLocalFileSystem { + private static final URI NAME; + static { + try { + NAME = new URI("raw:///"); + } catch (URISyntaxException se) { + throw new IllegalArgumentException("bad uri", se); + } + } + + @Override + public URI getUri() { + return NAME; + } + + static String execCommand(File f, String... cmd) throws IOException { + String[] args = new String[cmd.length + 1]; + System.arraycopy(cmd, 0, args, 0, cmd.length); + args[cmd.length] = f.getCanonicalPath(); + String output = Shell.execCommand(args); + return output; + } + + @Override + public FileStatus getFileStatus(Path path) throws IOException { + File file = pathToFile(path); + if (!file.exists()) { + throw new FileNotFoundException("Can't find " + path); + } + // get close enough + short mod = 0; + if (file.canRead()) { + mod |= 0444; + } + if (file.canWrite()) { + mod |= 0200; + } + if (file.canExecute()) { + mod |= 0111; + } + return new FileStatus(file.length(), file.isDirectory(), 1, 1024, + file.lastModified(), file.lastModified(), + FsPermission.createImmutable(mod), "owen", "users", path); + } + } + + private static final String COL1 = "id"; + private static final String COL2 = "msg"; + + private final HiveConf conf; + private final IMetaStoreClient msClient; + + final String metaStoreURI = null; + + // partitioned table + private final static String dbName = "testing"; + private final static String tblName = "alerts"; + private final static String[] fieldNames = new String[]{COL1,COL2}; + List partitionVals; + private static String partLocation; + + // unpartitioned table + private final static String dbName2 = "testing"; + private final static String tblName2 = "alerts"; + private final static String[] fieldNames2 = new String[]{COL1,COL2}; + + private final String PART1_CONTINENT = "Asia"; + private final String PART1_COUNTRY = "India"; + + @Rule + public TemporaryFolder dbFolder = new TemporaryFolder(); + + + public TestStreaming() throws Exception { + partitionVals = new ArrayList(2); + partitionVals.add(PART1_CONTINENT); + partitionVals.add(PART1_COUNTRY); + + //port = MetaStoreUtils.findFreePort(); + + conf = new HiveConf(this.getClass()); + conf.set("fs.raw.impl", RawFileSystem.class.getName()); + TxnDbUtil.setConfValues(conf); + if(metaStoreURI!=null) { + conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI); + } + conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true); + + //1) Start from a clean slate (metastore) + TxnDbUtil.cleanDb(); + TxnDbUtil.prepDb(); + + //2) obtain metastore clients + msClient = new HiveMetaStoreClient(conf); + //SessionState.start(new CliSessionState(conf)); + //driver = new Driver(conf); + } + + @Before + public void setup() throws Exception { + // drop and recreate the necessary databases and tables + dropDB(msClient, dbName); + createDbAndTable(msClient, dbName, tblName, partitionVals); + + dropDB(msClient, dbName2); + createDbAndTable(msClient, dbName2, tblName2, partitionVals); + } + + private void printResults(ArrayList res) { + for(String s: res) { + System.out.println(s); + } + System.out.println("Total records: " + res.size()); + } + + private static List getPartitionKeys() { + List fields = new ArrayList(); + // Defining partition names in unsorted order + fields.add(new FieldSchema("continent", serdeConstants.STRING_TYPE_NAME, "")); + fields.add(new FieldSchema("country", serdeConstants.STRING_TYPE_NAME, "")); + return fields; + } + + private void checkDataWritten(long minTxn, long maxTxn, int buckets, int numExpectedFiles, + String... records) throws Exception { + ValidTxnList txns = msClient.getValidTxns(); + AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(partLocation), conf, txns); + Assert.assertEquals(0, dir.getObsolete().size()); + Assert.assertEquals(0, dir.getOriginalFiles().size()); + List current = dir.getCurrentDirectories(); + System.out.println("Files found: "); + for (AcidUtils.ParsedDelta pd : current) System.out.println(pd.getPath().toString()); + Assert.assertEquals(numExpectedFiles, current.size()); + + // find the absolute mininum transaction + long min = Long.MAX_VALUE; + long max = Long.MIN_VALUE; + for (AcidUtils.ParsedDelta pd : current) { + if (pd.getMaxTransaction() > max) max = pd.getMaxTransaction(); + if (pd.getMinTransaction() < min) min = pd.getMinTransaction(); + } + Assert.assertEquals(minTxn, min); + Assert.assertEquals(maxTxn, max); + + InputFormat inf = new OrcInputFormat(); + JobConf job = new JobConf(); + job.set("mapred.input.dir", partLocation.toString()); + job.set("bucket_count", Integer.toString(buckets)); + job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); + InputSplit[] splits = inf.getSplits(job, 1); + Assert.assertEquals(1, splits.length); + org.apache.hadoop.mapred.RecordReader rr = + inf.getRecordReader(splits[0], job, Reporter.NULL); + + NullWritable key = rr.createKey(); + OrcStruct value = rr.createValue(); + for(int i = 0; i < records.length; i++) { + Assert.assertEquals(true, rr.next(key, value)); + Assert.assertEquals(records[i], value.toString()); + } + Assert.assertEquals(false, rr.next(key, value)); + } + + private void checkNothingWritten() throws Exception { + ValidTxnList txns = msClient.getValidTxns(); + AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(partLocation), conf, txns); + Assert.assertEquals(0, dir.getObsolete().size()); + Assert.assertEquals(0, dir.getOriginalFiles().size()); + List current = dir.getCurrentDirectories(); + Assert.assertEquals(0, current.size()); + } + + @Test + public void testEndpointConnection() throws Exception { + // 1) Basic + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName + , partitionVals); + StreamingConnection connection = endPt.newConnection(false); //shouldn't throw + connection.close(); + + // 2) Leave partition unspecified + endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null); + endPt.newConnection(false).close(); // should not throw + } + + @Test + public void testAddPartition() throws Exception { + List newPartVals = new ArrayList(2); + newPartVals.add(PART1_CONTINENT); + newPartVals.add("Nepal"); + + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName + , newPartVals); + + // Ensure partition is absent + try { + msClient.getPartition(endPt.database, endPt.table, endPt.partitionVals); + Assert.assertTrue("Partition already exists", false); + } catch (NoSuchObjectException e) { + // expect this exception + } + + // Create partition + Assert.assertNotNull(endPt.newConnection(true)); + + // Ensure partition is present + Partition p = msClient.getPartition(endPt.database, endPt.table, endPt.partitionVals); + Assert.assertNotNull("Did not find added partition", p); + } + + @Test + public void testTransactionBatchEmptyCommit() throws Exception { + // 1) to partitioned table + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, + partitionVals); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); + StreamingConnection connection = endPt.newConnection(false); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); + txnBatch.beginNextTransaction(); + txnBatch.commit(); + Assert.assertEquals(TransactionBatch.TxnState.COMMITTED + , txnBatch.getCurrentTransactionState()); + txnBatch.close(); + connection.close(); + + // 2) To unpartitioned table + endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); + writer = new DelimitedInputWriter(fieldNames2,",", endPt); + connection = endPt.newConnection(false); + + txnBatch = connection.fetchTransactionBatch(10, writer); + txnBatch.beginNextTransaction(); + txnBatch.commit(); + Assert.assertEquals(TransactionBatch.TxnState.COMMITTED + , txnBatch.getCurrentTransactionState()); + txnBatch.close(); + connection.close(); + } + + @Test + public void testTransactionBatchEmptyAbort() throws Exception { + // 1) to partitioned table + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, + partitionVals); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); + StreamingConnection connection = endPt.newConnection(true); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); + txnBatch.beginNextTransaction(); + txnBatch.abort(); + Assert.assertEquals(TransactionBatch.TxnState.ABORTED + , txnBatch.getCurrentTransactionState()); + txnBatch.close(); + connection.close(); + + // 2) to unpartitioned table + endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); + writer = new DelimitedInputWriter(fieldNames,",", endPt); + connection = endPt.newConnection(true); + + txnBatch = connection.fetchTransactionBatch(10, writer); + txnBatch.beginNextTransaction(); + txnBatch.abort(); + Assert.assertEquals(TransactionBatch.TxnState.ABORTED + , txnBatch.getCurrentTransactionState()); + txnBatch.close(); + connection.close(); + } + + @Test + public void testTransactionBatchCommit_Delimited() throws Exception { + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, + partitionVals); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); + StreamingConnection connection = endPt.newConnection(true); + + // 1st Txn + TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); + txnBatch.beginNextTransaction(); + Assert.assertEquals(TransactionBatch.TxnState.OPEN + , txnBatch.getCurrentTransactionState()); + txnBatch.write("1,Hello streaming".getBytes()); + txnBatch.commit(); + + checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}"); + + Assert.assertEquals(TransactionBatch.TxnState.COMMITTED + , txnBatch.getCurrentTransactionState()); + + // 2nd Txn + txnBatch.beginNextTransaction(); + Assert.assertEquals(TransactionBatch.TxnState.OPEN + , txnBatch.getCurrentTransactionState()); + txnBatch.write("2,Welcome to streaming".getBytes()); + + // data should not be visible + checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}"); + + txnBatch.commit(); + + checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}", + "{2, Welcome to streaming}"); + + txnBatch.close(); + Assert.assertEquals(TransactionBatch.TxnState.INACTIVE + , txnBatch.getCurrentTransactionState()); + + + connection.close(); + + + // To Unpartitioned table + endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); + writer = new DelimitedInputWriter(fieldNames,",", endPt); + connection = endPt.newConnection(true); + + // 1st Txn + txnBatch = connection.fetchTransactionBatch(10, writer); + txnBatch.beginNextTransaction(); + Assert.assertEquals(TransactionBatch.TxnState.OPEN + , txnBatch.getCurrentTransactionState()); + txnBatch.write("1,Hello streaming".getBytes()); + txnBatch.commit(); + + Assert.assertEquals(TransactionBatch.TxnState.COMMITTED + , txnBatch.getCurrentTransactionState()); + connection.close(); + } + + @Test + public void testTransactionBatchCommit_Json() throws Exception { + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, + partitionVals); + StrictJsonWriter writer = new StrictJsonWriter(endPt); + StreamingConnection connection = endPt.newConnection(true); + + // 1st Txn + TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); + txnBatch.beginNextTransaction(); + Assert.assertEquals(TransactionBatch.TxnState.OPEN + , txnBatch.getCurrentTransactionState()); + String rec1 = "{\"id\" : 1, \"msg\": \"Hello streaming\"}"; + txnBatch.write(rec1.getBytes()); + txnBatch.commit(); + + checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}"); + + Assert.assertEquals(TransactionBatch.TxnState.COMMITTED + , txnBatch.getCurrentTransactionState()); + + txnBatch.close(); + Assert.assertEquals(TransactionBatch.TxnState.INACTIVE + , txnBatch.getCurrentTransactionState()); + + connection.close(); + } + + @Test + public void testRemainingTransactions() throws Exception { + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, + partitionVals); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); + StreamingConnection connection = endPt.newConnection(true); + + // 1) test with txn.Commit() + TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); + int batch=0; + int initialCount = txnBatch.remainingTransactions(); + while(txnBatch.remainingTransactions()>0) { + txnBatch.beginNextTransaction(); + Assert.assertEquals(--initialCount, txnBatch.remainingTransactions()); + for (int rec=0; rec<2; ++rec) { + Assert.assertEquals(TransactionBatch.TxnState.OPEN + , txnBatch.getCurrentTransactionState()); + txnBatch.write((batch * rec + ",Hello streaming").getBytes()); + } + txnBatch.commit(); + Assert.assertEquals(TransactionBatch.TxnState.COMMITTED + , txnBatch.getCurrentTransactionState()); + ++batch; + } + Assert.assertEquals(0, txnBatch.remainingTransactions()); + txnBatch.close(); + + Assert.assertEquals(TransactionBatch.TxnState.INACTIVE + , txnBatch.getCurrentTransactionState()); + + // 2) test with txn.Abort() + txnBatch = connection.fetchTransactionBatch(10, writer); + batch=0; + initialCount = txnBatch.remainingTransactions(); + while(txnBatch.remainingTransactions()>0) { + txnBatch.beginNextTransaction(); + Assert.assertEquals(--initialCount,txnBatch.remainingTransactions()); + for (int rec=0; rec<2; ++rec) { + Assert.assertEquals(TransactionBatch.TxnState.OPEN + , txnBatch.getCurrentTransactionState()); + txnBatch.write((batch * rec + ",Hello streaming").getBytes()); + } + txnBatch.abort(); + Assert.assertEquals(TransactionBatch.TxnState.ABORTED + , txnBatch.getCurrentTransactionState()); + ++batch; + } + Assert.assertEquals(0,txnBatch.remainingTransactions()); + txnBatch.close(); + + Assert.assertEquals(TransactionBatch.TxnState.INACTIVE + , txnBatch.getCurrentTransactionState()); + + connection.close(); + } + + @Test + public void testTransactionBatchAbort() throws Exception { + + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, + partitionVals); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); + StreamingConnection connection = endPt.newConnection(false); + + + TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); + txnBatch.beginNextTransaction(); + txnBatch.write("1,Hello streaming".getBytes()); + txnBatch.write("2,Welcome to streaming".getBytes()); + txnBatch.abort(); + + checkNothingWritten(); + + Assert.assertEquals(TransactionBatch.TxnState.ABORTED + , txnBatch.getCurrentTransactionState()); + + txnBatch.close(); + connection.close(); + + checkNothingWritten(); + + } + + + @Test + public void testTransactionBatchAbortAndCommit() throws Exception { + + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, + partitionVals); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); + StreamingConnection connection = endPt.newConnection(false); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); + txnBatch.beginNextTransaction(); + txnBatch.write("1,Hello streaming".getBytes()); + txnBatch.write("2,Welcome to streaming".getBytes()); + txnBatch.abort(); + + checkNothingWritten(); + + Assert.assertEquals(TransactionBatch.TxnState.ABORTED + , txnBatch.getCurrentTransactionState()); + + txnBatch.beginNextTransaction(); + txnBatch.write("1,Hello streaming".getBytes()); + txnBatch.write("2,Welcome to streaming".getBytes()); + txnBatch.commit(); + + checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}", + "{2, Welcome to streaming}"); + + txnBatch.close(); + connection.close(); + } + + @Test + public void testMultipleTransactionBatchCommits() throws Exception { + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, + partitionVals); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); + StreamingConnection connection = endPt.newConnection(true); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); + txnBatch.beginNextTransaction(); + txnBatch.write("1,Hello streaming".getBytes()); + txnBatch.commit(); + + checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}"); + + txnBatch.beginNextTransaction(); + txnBatch.write("2,Welcome to streaming".getBytes()); + txnBatch.commit(); + + checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}", + "{2, Welcome to streaming}"); + + txnBatch.close(); + + // 2nd Txn Batch + txnBatch = connection.fetchTransactionBatch(10, writer); + txnBatch.beginNextTransaction(); + txnBatch.write("3,Hello streaming - once again".getBytes()); + txnBatch.commit(); + + checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}", + "{2, Welcome to streaming}", "{3, Hello streaming - once again}"); + + txnBatch.beginNextTransaction(); + txnBatch.write("4,Welcome to streaming - once again".getBytes()); + txnBatch.commit(); + + checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}", + "{2, Welcome to streaming}", "{3, Hello streaming - once again}", + "{4, Welcome to streaming - once again}"); + + Assert.assertEquals(TransactionBatch.TxnState.COMMITTED + , txnBatch.getCurrentTransactionState()); + + txnBatch.close(); + + connection.close(); + } + + + @Test + public void testInterleavedTransactionBatchCommits() throws Exception { + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, + partitionVals); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt); + StreamingConnection connection = endPt.newConnection(false); + + // Acquire 1st Txn Batch + TransactionBatch txnBatch1 = connection.fetchTransactionBatch(10, writer); + txnBatch1.beginNextTransaction(); + + // Acquire 2nd Txn Batch + DelimitedInputWriter writer2 = new DelimitedInputWriter(fieldNames, ",", endPt); + TransactionBatch txnBatch2 = connection.fetchTransactionBatch(10, writer2); + txnBatch2.beginNextTransaction(); + + // Interleaved writes to both batches + txnBatch1.write("1,Hello streaming".getBytes()); + txnBatch2.write("3,Hello streaming - once again".getBytes()); + + checkNothingWritten(); + + txnBatch2.commit(); + + checkDataWritten(11, 20, 1, 1, "{3, Hello streaming - once again}"); + + txnBatch1.commit(); + + checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}"); + + txnBatch1.beginNextTransaction(); + txnBatch1.write("2,Welcome to streaming".getBytes()); + + txnBatch2.beginNextTransaction(); + txnBatch2.write("4,Welcome to streaming - once again".getBytes()); + + checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}"); + + txnBatch1.commit(); + + checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}", + "{2, Welcome to streaming}", + "{3, Hello streaming - once again}"); + + txnBatch2.commit(); + + checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}", + "{2, Welcome to streaming}", + "{3, Hello streaming - once again}", + "{4, Welcome to streaming - once again}"); + + Assert.assertEquals(TransactionBatch.TxnState.COMMITTED + , txnBatch1.getCurrentTransactionState()); + Assert.assertEquals(TransactionBatch.TxnState.COMMITTED + , txnBatch2.getCurrentTransactionState()); + + txnBatch1.close(); + txnBatch2.close(); + + connection.close(); + } + + class WriterThd extends Thread { + + private StreamingConnection conn; + private HiveEndPoint ep; + private DelimitedInputWriter writer; + private String data; + + WriterThd(HiveEndPoint ep, String data) throws Exception { + this.ep = ep; + writer = new DelimitedInputWriter(fieldNames, ",", ep); + conn = ep.newConnection(false); + this.data = data; + } + + WriterThd(StreamingConnection conn, HiveEndPoint ep, DelimitedInputWriter writer, String data) { + this.conn = conn; + this.ep = ep; + this.writer = writer; + this.data = data; + } + @Override + public void run() { + TransactionBatch txnBatch = null; + try { + txnBatch = conn.fetchTransactionBatch(1000, writer); + while(txnBatch.remainingTransactions() > 0) { + txnBatch.beginNextTransaction(); + txnBatch.write(data.getBytes()); + txnBatch.write(data.getBytes()); + txnBatch.commit(); + } // while + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + if (txnBatch != null) { + try { + txnBatch.close(); + } catch (Exception e) { + conn.close(); + throw new RuntimeException(e); + } + } + try { + conn.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + } + } + } + + @Test + public void testConcurrentTransactionBatchCommits() throws Exception { + final HiveEndPoint ep = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); + WriterThd t1 = new WriterThd(ep, "1,Matrix"); + WriterThd t2 = new WriterThd(ep, "2,Gandhi"); + WriterThd t3 = new WriterThd(ep, "3,Silence"); + + t1.start(); + t2.start(); + t3.start(); + + t1.join(); + t2.join(); + t3.join(); + + } + + // delete db and all tables in it + public static void dropDB(IMetaStoreClient client, String databaseName) { + try { + for(String table : client.listTableNamesByFilter(databaseName, "", (short)-1)) { + client.dropTable(databaseName, table, true, true); + } + client.dropDatabase(databaseName); + } catch (TException e) { + } + + } + + public void createDbAndTable(IMetaStoreClient client, String databaseName, + String tableName, List partVals) + throws Exception { + Database db = new Database(); + db.setName(databaseName); + String dbLocation = "raw://" + dbFolder.newFolder(databaseName + ".db").getCanonicalPath(); + db.setLocationUri(dbLocation); + client.createDatabase(db); + + Table tbl = new Table(); + tbl.setDbName(databaseName); + tbl.setTableName(tableName); + tbl.setTableType(TableType.MANAGED_TABLE.toString()); + StorageDescriptor sd = new StorageDescriptor(); + sd.setCols(getTableColumns()); + sd.setNumBuckets(1); + sd.setLocation(dbLocation + Path.SEPARATOR + tableName); + tbl.setPartitionKeys(getPartitionKeys()); + + tbl.setSd(sd); + + sd.setBucketCols(new ArrayList(2)); + sd.setSerdeInfo(new SerDeInfo()); + sd.getSerdeInfo().setName(tbl.getTableName()); + sd.getSerdeInfo().setParameters(new HashMap()); + sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1"); + + sd.getSerdeInfo().setSerializationLib(OrcSerde.class.getName()); + sd.setInputFormat(HiveInputFormat.class.getName()); + sd.setOutputFormat(OrcOutputFormat.class.getName()); + + Map tableParams = new HashMap(); + tbl.setParameters(tableParams); + client.createTable(tbl); + + try { + addPartition(client, tbl, partVals); + } catch (AlreadyExistsException e) { + } + Partition createdPartition = client.getPartition(databaseName, tableName, partVals); + partLocation = createdPartition.getSd().getLocation(); + } + + private static void addPartition(IMetaStoreClient client, Table tbl + , List partValues) + throws IOException, TException { + Partition part = new Partition(); + part.setDbName(tbl.getDbName()); + part.setTableName(tblName); + StorageDescriptor sd = new StorageDescriptor(tbl.getSd()); + sd.setLocation(sd.getLocation() + Path.SEPARATOR + makePartPath(tbl.getPartitionKeys(), partValues)); + part.setSd(sd); + part.setValues(partValues); + client.add_partition(part); + } + + private static String makePartPath(List partKeys, List partVals) { + if(partKeys.size()!=partVals.size()) { + throw new IllegalArgumentException("Partition values:" + partVals + ", does not match the partition Keys in table :" + partKeys ); + } + StringBuffer buff = new StringBuffer(partKeys.size()*20); + buff.append(" ( "); + int i=0; + for(FieldSchema schema : partKeys) { + buff.append(schema.getName()); + buff.append("='"); + buff.append(partVals.get(i)); + buff.append("'"); + if(i!=partKeys.size()-1) { + buff.append(Path.SEPARATOR); + } + ++i; + } + buff.append(" )"); + return buff.toString(); + } + + + private static List getTableColumns() { + List fields = new ArrayList(); + fields.add(new FieldSchema(COL1, serdeConstants.INT_TYPE_NAME, "")); + fields.add(new FieldSchema(COL2, serdeConstants.STRING_TYPE_NAME, "")); + return fields; + } +} diff --git streaming/src/test/sit streaming/src/test/sit new file mode 100644 index 0000000..8116724 --- /dev/null +++ streaming/src/test/sit @@ -0,0 +1,34 @@ +#!/bin/sh + +if [ "${HADOOP_HOME}x" == "x" ] + then + echo "Please set HADOOP_HOME"; + exit 1 +fi + +if [ "${HIVE_HOME}x" == "x" ] + then + echo "Please set HIVE_HOME"; + exit 1 +fi + +if [ "${JAVA_HOME}x" == "x" ] + then + echo "Please set JAVA_HOME"; + exit 1 +fi + +for jar in ${HADOOP_HOME}/client/*.jar + do + CLASSPATH=${CLASSPATH}:$jar +done + +for jar in ${HIVE_HOME}/lib/*.jar + do + CLASSPATH=${CLASSPATH}:$jar +done + +CLASSPATH=${CLASSPATH}:${HADOOP_HOME}/conf +# don't put hive-site in the classpath + +$JAVA_HOME/bin/java -cp ${CLASSPATH} org.apache.hive.streaming.StreamingIntegrationTester $@