diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java index 9e90d36dae..f940487380 100644 --- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -46,9 +46,11 @@ import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.serde2.AbstractSerDe; @@ -66,6 +68,7 @@ private static final Logger LOG = LoggerFactory.getLogger(AbstractRecordWriter.class.getName()); private static final String DEFAULT_LINE_DELIMITER_PATTERN = "[\r\n]"; + private Integer statementId; protected HiveConf conf; protected StreamingConnection conn; protected Table table; @@ -75,6 +78,7 @@ protected Map> updaters = new HashMap<>(); protected Map partitionPaths = new HashMap<>(); protected Set addedPartitions = new HashSet<>(); + protected Set undeterminedPartitions = new HashSet<>(); // input OI includes table columns + partition columns protected StructObjectInspector inputRowObjectInspector; // output OI strips off the partition columns and retains other columns @@ -128,13 +132,21 @@ public void memoryUsageAboveThreshold(final long usedMemory, final long maxMemor } @Override - public void init(StreamingConnection conn, long minWriteId, long maxWriteId) throws StreamingException { + public void init(StreamingConnection conn, long minWriteId, long maxWriteId) + throws StreamingException { + init(conn, minWriteId, maxWriteId, -1); + } + + @Override + public void init(StreamingConnection conn, long minWriteId, long maxWriteId, + int statementId) throws StreamingException { if (conn == null) { throw new StreamingException("Streaming connection cannot be null during record writer initialization"); } this.conn = conn; this.curBatchMinWriteId = minWriteId; this.curBatchMaxWriteId = maxWriteId; + this.statementId = statementId; this.conf = conn.getHiveConf(); this.defaultPartitionName = conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME); this.table = conn.getTable(); @@ -308,7 +320,7 @@ protected int getBucket(Object row) { if (!conn.isPartitionedTable()) { return null; } - List partitionValues = new ArrayList<>(); + List partitionValues = new ArrayList<>(); if (conn.isPartitionedTable() && conn.isDynamicPartitioning()) { Object[] partitionFields = getPartitionFields(row); for (int i = 0; i < partitionObjInspectors.length; i++) { @@ -431,6 +443,7 @@ public void write(final long writeId, final byte[] record) throws StreamingExcep int bucket = getBucket(encodedRow); List partitionValues = getPartitionValues(encodedRow); getRecordUpdater(partitionValues, bucket).insert(writeId, encodedRow); + // ingest size bytes gets resetted on flush() whereas connection stats is not conn.getConnectionStats().incrementRecordsWritten(); conn.getConnectionStats().incrementRecordsSize(record.length); @@ -476,6 +489,11 @@ protected void checkAutoFlush() throws StreamingIOFailure { return addedPartitions; } + @Override + public Set getUndeterminedPartitions() { + return undeterminedPartitions; + } + protected RecordUpdater createRecordUpdater(final Path partitionPath, int bucketId, Long minWriteId, Long maxWriteID) throws IOException { @@ -492,10 +510,53 @@ protected RecordUpdater createRecordUpdater(final Path partitionPath, int bucket .tableProperties(tblProperties) .minimumWriteId(minWriteId) .maximumWriteId(maxWriteID) - .statementId(-1) + .statementId(statementId) .finalDestination(partitionPath)); } + /** + * Returns the file that would be used to store rows under this. + * parameters + * @param partitionValues partition values + * @param bucketId bucket id + * @param minWriteId min write Id + * @param maxWriteId max write Id + * @param statementId statement Id + * @param table table + * @return the location of the file. + * @throws StreamingException when the path is not found + */ + @Override + public Path getDeltaFileLocation(List partitionValues, + Integer bucketId, Long minWriteId, Long maxWriteId, Integer statementId, + Table table) throws StreamingException { + Path destLocation; + if (partitionValues == null) { + destLocation = new Path(table.getSd().getLocation()); + } else { + Map partSpec = Warehouse.makeSpecFromValues( + table.getPartitionKeys(), partitionValues); + try { + destLocation = new Path(table.getDataLocation(), Warehouse.makePartPath(partSpec)); + } catch (MetaException e) { + throw new StreamingException("Unable to retrieve the delta file location" + + " for values: " + partitionValues + + ", minWriteId: " + minWriteId + + ", maxWriteId: " + maxWriteId + + ", statementId: " + statementId, e); + } + } + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) + .filesystem(fs) + .inspector(outputRowObjectInspector) + .bucket(bucketId) + .minimumWriteId(minWriteId) + .maximumWriteId(maxWriteId) + .statementId(statementId) + .finalDestination(destLocation); + return AcidUtils.createFilename(destLocation, options); + } + protected RecordUpdater getRecordUpdater(List partitionValues, int bucketId) throws StreamingIOFailure { RecordUpdater recordUpdater; String key; @@ -514,15 +575,27 @@ protected RecordUpdater getRecordUpdater(List partitionValues, int bucke PartitionInfo partitionInfo = conn.createPartitionIfNotExists(partitionValues); // collect the newly added partitions. connection.commitTransaction() will report the dynamically added // partitions to TxnHandler - if (!partitionInfo.isExists()) { + switch(partitionInfo.getExists()) { + case NOT_EXISTS: addedPartitions.add(partitionInfo.getName()); if (LOG.isDebugEnabled()) { LOG.debug("Created partition {} for table {}", partitionInfo.getName(), fullyQualifiedTableName); } - } else { + break; + case EXISTS: if (LOG.isDebugEnabled()) { LOG.debug("Partition {} already exists for table {}", partitionInfo.getName(), fullyQualifiedTableName); } + break; + case UNKNOWN: + undeterminedPartitions.add(partitionInfo.getName()); + if (LOG.isDebugEnabled()) { + LOG.debug("Couldn't determine if partition with values {} for" + + " table {} exists", partitionValues, + fullyQualifiedTableName); + } + break; + default: } destLocation = new Path(partitionInfo.getPartitionLocation()); } diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java b/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java new file mode 100644 index 0000000000..47d127afe7 --- /dev/null +++ b/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.apache.hive.streaming.transaction.InvalidTransactionState; +import org.apache.hive.streaming.transaction.StreamingTransaction; +import org.apache.hive.streaming.transaction.TxnState; + +import java.io.InputStream; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Common methods for the implementing classes. + */ +abstract class AbstractStreamingTransaction + implements StreamingTransaction { + + /** + * This variable should be initialized by the children. + */ + protected RecordWriter recordWriter; + + /** + * This variable should be initialized by the children. + */ + protected List txnToWriteIds; + + + /** + * once any operation on this batch encounters a system exception + * (e.g. IOException on write) it's safest to assume that we can't write to the + * file backing this batch any more. This guards important public methods + */ + protected final AtomicBoolean isTxnClosed = new AtomicBoolean(false); + + protected int currentTxnIndex = -1; + protected TxnState state; + + + protected void checkIsClosed() throws StreamingException { + if (isTxnClosed.get()) { + throw new StreamingException("Transaction" + toString() + " is closed()"); + } + } + + protected void beginNextTransactionImpl(String errorMessage) + throws StreamingException{ + state = TxnState.INACTIVE; //clear state from previous txn + if ((currentTxnIndex + 1) >= txnToWriteIds.size()) { + throw new InvalidTransactionState(errorMessage); + } + currentTxnIndex++; + state = TxnState.OPEN; + } + + public void write(final byte[] record) throws StreamingException { + checkIsClosed(); + boolean success = false; + try { + recordWriter.write(getCurrentWriteId(), record); + success = true; + } catch (SerializationError ex) { + //this exception indicates that a {@code record} could not be parsed and the + //caller can decide whether to drop it or send it to dead letter queue. + //rolling back the txn and retrying won't help since the tuple will be exactly the same + //when it's replayed. + success = true; + throw ex; + } finally { + markDead(success); + } + } + + public void write(final InputStream inputStream) throws StreamingException { + checkIsClosed(); + boolean success = false; + try { + recordWriter.write(getCurrentWriteId(), inputStream); + success = true; + } catch (SerializationError ex) { + //this exception indicates that a {@code record} could not be parsed and the + //caller can decide whether to drop it or send it to dead letter queue. + //rolling back the txn and retrying won'table help since the tuple will be exactly the same + //when it's replayed. + success = true; + throw ex; + } finally { + markDead(success); + } + } + + /** + * A transaction batch opens a single HDFS file and writes multiple transaction to it. If there is any issue + * with the write, we can't continue to write to the same file any as it may be corrupted now (at the tail). + * This ensures that a client can't ignore these failures and continue to write. + */ + protected void markDead(boolean success) throws StreamingException { + if (success) { + return; + } + close(); + } + + public long getCurrentWriteId() { + if (currentTxnIndex >= 0) { + return txnToWriteIds.get(currentTxnIndex).getWriteId(); + } + return -1L; + } + + public int remainingTransactions() { + if (currentTxnIndex >= 0) { + return txnToWriteIds.size() - currentTxnIndex - 1; + } + return txnToWriteIds.size(); + } + + public boolean isClosed() { + return isTxnClosed.get(); + } + + public TxnState getCurrentTransactionState() { + return state; + } + + public long getCurrentTxnId() { + if (currentTxnIndex >= 0) { + return txnToWriteIds.get(currentTxnIndex).getTxnId(); + } + return -1L; + } + + @Override + public List getTxnToWriteIds() { + return txnToWriteIds; + } + + public void commit() throws StreamingException { + commitWithPartitions(null); + } +} diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index 6cf14b064f..9c563066ab 100644 --- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -23,55 +23,40 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; - +import com.google.common.collect.Lists; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.LockComponentBuilder; -import org.apache.hadoop.hive.metastore.LockRequestBuilder; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; -import org.apache.hadoop.hive.metastore.api.DataOperationType; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; -import org.apache.hadoop.hive.metastore.api.LockRequest; -import org.apache.hadoop.hive.metastore.api.LockResponse; -import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; -import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.common.util.ShutdownHookManager; +import org.apache.hive.streaming.transaction.StreamingTransaction; +import org.apache.hive.streaming.transaction.TxnState; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Streaming connection implementation for hive. To create a streaming connection, use the builder API @@ -119,24 +104,9 @@ private static final Logger LOG = LoggerFactory.getLogger(HiveStreamingConnection.class.getName()); private static final String DEFAULT_METASTORE_URI = "thrift://localhost:9083"; - private static final int DEFAULT_TRANSACTION_BATCH_SIZE = 1; - private static final int DEFAULT_HEARTBEAT_INTERVAL = 60 * 1000; + private static final int DEFAULT_TRANSACTION_BATCH_SIZE = 10; private static final boolean DEFAULT_STREAMING_OPTIMIZATIONS_ENABLED = true; - public enum TxnState { - INACTIVE("I"), OPEN("O"), COMMITTED("C"), ABORTED("A"); - - private final String code; - - TxnState(String code) { - this.code = code; - } - - public String toString() { - return code; - } - } - // fields populated from builder private String database; private String table; @@ -144,13 +114,13 @@ public String toString() { private String agentInfo; private int transactionBatchSize; private RecordWriter recordWriter; - private TransactionBatch currentTransactionBatch; + private StreamingTransaction currentTransactionBatch; private HiveConf conf; private boolean streamingOptimizations; private AtomicBoolean isConnectionClosed = new AtomicBoolean(false); // internal fields - private boolean isPartitionedTable; + private Boolean isPartitionedTable; private IMetaStoreClient msClient; private IMetaStoreClient heartbeatMSClient; private final String username; @@ -158,6 +128,11 @@ public String toString() { private Table tableObject = null; private String metastoreUri; private ConnectionStats connectionStats; + private final Long writeId; + private final Integer statementId; + private boolean manageTransactions; + private int countTransactions = 0; + private List undeterminedPartitions; private HiveStreamingConnection(Builder builder) throws StreamingException { this.database = builder.database.toLowerCase(); @@ -166,6 +141,12 @@ private HiveStreamingConnection(Builder builder) throws StreamingException { this.conf = builder.hiveConf; this.agentInfo = builder.agentInfo; this.streamingOptimizations = builder.streamingOptimizations; + this.writeId = builder.writeId; + this.statementId = builder.statementId; + this.tableObject = builder.tableObject; + this.setPartitionedTable(builder.isPartitioned); + this.manageTransactions = builder.manageTransactions; + UserGroupInformation loggedInUser = null; try { loggedInUser = UserGroupInformation.getLoginUser(); @@ -193,13 +174,18 @@ private HiveStreamingConnection(Builder builder) throws StreamingException { if (conf == null) { conf = createHiveConf(this.getClass(), DEFAULT_METASTORE_URI); } + overrideConfSettings(conf); - this.metastoreUri = conf.get(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName()); - this.msClient = getMetaStoreClient(conf, metastoreUri, secureMode, "streaming-connection"); - // We use a separate metastore client for heartbeat calls to ensure heartbeat RPC calls are - // isolated from the other transaction related RPC calls. - this.heartbeatMSClient = getMetaStoreClient(conf, metastoreUri, secureMode, "streaming-connection-heartbeat"); - validateTable(); + if (manageTransactions) { + this.metastoreUri = conf.get(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName()); + this.msClient = getMetaStoreClient(conf, metastoreUri, secureMode, + "streaming-connection"); + // We use a separate metastore client for heartbeat calls to ensure heartbeat RPC calls are + // isolated from the other transaction related RPC calls. + this.heartbeatMSClient = getMetaStoreClient(conf, metastoreUri, secureMode, + "streaming-connection-heartbeat"); + validateTable(); + } LOG.info("STREAMING CONNECTION INFO: {}", toConnectionInfoString()); } @@ -217,6 +203,11 @@ public static Builder newBuilder() { private int transactionBatchSize = DEFAULT_TRANSACTION_BATCH_SIZE; private boolean streamingOptimizations = DEFAULT_STREAMING_OPTIMIZATIONS_ENABLED; private RecordWriter recordWriter; + private Long writeId; + private Integer statementId = -1; + private boolean manageTransactions = true; + private Table tableObject; + private Boolean isPartitioned; /** * Specify database to use for streaming connection. @@ -314,6 +305,44 @@ public Builder withRecordWriter(final RecordWriter recordWriter) { return this; } + /** + * Specify this parameter if we want the current connection + * to join an ongoing transaction without having to query + * the metastore to create it. + * @param writeId write id + * @return builder + */ + public Builder withWriteId(final long writeId) { + this.writeId = writeId; + manageTransactions = false; + return this; + } + + /** + * Specify this parameter to set an statement id in the writer. + * This really only makes sense to be specified when a writeId is + * provided as well + * @param statementId statement id + * @return builder + */ + public Builder withStatementId(final int statementId) { + this.statementId = statementId; + return this; + } + + /** + * Specify the table object since sometimes no connections + * to the metastore will be opened. + * @param table table object. + * @return builder + */ + public Builder withTableObject(Table table) { + this.tableObject = table; + this.isPartitioned = tableObject.getPartitionKeys() != null + && !tableObject.getPartitionKeys().isEmpty(); + return this; + } + /** * Returning a streaming connection to hive. * @@ -324,11 +353,27 @@ public HiveStreamingConnection connect() throws StreamingException { throw new StreamingException("Database cannot be null for streaming connection"); } if (table == null) { - throw new StreamingException("Table cannot be null for streaming connection"); + if (tableObject == null) { + throw new StreamingException("Table and table object cannot be " + + "null for streaming connection"); + } else { + table = tableObject.getTableName(); + } + } + + if (tableObject != null && !tableObject.getTableName().equals(table)) { + throw new StreamingException("Table must match tableObject table name"); } + if (recordWriter == null) { throw new StreamingException("Record writer cannot be null for streaming connection"); } + if ((writeId != null && tableObject == null) || + (writeId == null && tableObject != null)){ + throw new StreamingException("If writeId is set, tableObject " + + "must be set as well and vice versa"); + } + HiveStreamingConnection streamingConnection = new HiveStreamingConnection(this); // assigning higher priority than FileSystem shutdown hook so that streaming connection gets closed first before // filesystem close (to avoid ClosedChannelException) @@ -338,7 +383,7 @@ public HiveStreamingConnection connect() throws StreamingException { } } - private void setPartitionedTable(boolean isPartitionedTable) { + private void setPartitionedTable(Boolean isPartitionedTable) { this.isPartitionedTable = isPartitionedTable; } @@ -356,7 +401,8 @@ private String toConnectionInfoString() { "username: " + username + ", " + "secure-mode: " + secureMode + ", " + "record-writer: " + recordWriter.getClass().getSimpleName() + ", " + - "agent-info: " + agentInfo + " }"; + "agent-info: " + agentInfo + ", " + + "writeId: " + writeId + " }"; } @VisibleForTesting @@ -369,6 +415,7 @@ public PartitionInfo createPartitionIfNotExists(final List partitionValu String partLocation = null; String partName = null; boolean exists = false; + try { Map partSpec = Warehouse.makeSpecFromValues(tableObject.getPartitionKeys(), partitionValues); AddPartitionDesc addPartitionDesc = new AddPartitionDesc(database, table, true); @@ -376,6 +423,11 @@ public PartitionInfo createPartitionIfNotExists(final List partitionValu partLocation = new Path(tableObject.getDataLocation(), Warehouse.makePartPath(partSpec)).toString(); addPartitionDesc.addPartition(partSpec, partLocation); Partition partition = Hive.convertAddSpecToMetaPartition(tableObject, addPartitionDesc.getPartition(0), conf); + + if (getMSC() == null) { + return new PartitionInfo(partName, partLocation, PartitionInfo.Exists.UNKNOWN); + } + getMSC().add_partition(partition); } catch (AlreadyExistsException e) { exists = true; @@ -383,7 +435,27 @@ public PartitionInfo createPartitionIfNotExists(final List partitionValu throw new StreamingException("Unable to creation partition for values: " + partitionValues + " connection: " + toConnectionInfoString(), e); } - return new PartitionInfo(partName, partLocation, exists); + return new PartitionInfo(partName, partLocation, + exists ? PartitionInfo.Exists.EXISTS: PartitionInfo.Exists.NOT_EXISTS); + } + + /** + * Returns the file that would be used to store rows under this. + * parameters + * @param partitionValues partition values + * @param bucketId bucket id + * @param minWriteId min write Id + * @param maxWriteId max write Id + * @param statementId statement Id + * @return the location of the file. + * @throws StreamingException when the path is not found + */ + @Override + public Path getDeltaFileLocation(List partitionValues, + Integer bucketId, Long minWriteId, Long maxWriteId, Integer statementId) + throws StreamingException { + return recordWriter.getDeltaFileLocation(partitionValues, + bucketId, minWriteId, maxWriteId, statementId, tableObject); } IMetaStoreClient getMSC() { @@ -424,43 +496,6 @@ private void validateTable() throws InvalidTable, ConnectionError { } } - private static class HeartbeatRunnable implements Runnable { - private final HiveStreamingConnection conn; - private final AtomicLong minTxnId; - private final long maxTxnId; - private final ReentrantLock transactionLock; - private final AtomicBoolean isTxnClosed; - - HeartbeatRunnable(final HiveStreamingConnection conn, final AtomicLong minTxnId, final long maxTxnId, - final ReentrantLock transactionLock, final AtomicBoolean isTxnClosed) { - this.conn = conn; - this.minTxnId = minTxnId; - this.maxTxnId = maxTxnId; - this.transactionLock = transactionLock; - this.isTxnClosed = isTxnClosed; - } - - @Override - public void run() { - transactionLock.lock(); - try { - if (minTxnId.get() > 0) { - HeartbeatTxnRangeResponse resp = conn.getHeatbeatMSC().heartbeatTxnRange(minTxnId.get(), maxTxnId); - if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) { - LOG.error("Heartbeat failure: {}", resp.toString()); - isTxnClosed.set(true); - } else { - LOG.info("Heartbeat sent for range: [{}-{}]", minTxnId.get(), maxTxnId); - } - } - } catch (TException e) { - LOG.warn("Failure to heartbeat for transaction range: [" + minTxnId.get() + "-" + maxTxnId + "]", e); - } finally { - transactionLock.unlock(); - } - } - } - private void beginNextTransaction() throws StreamingException { if (currentTransactionBatch == null) { currentTransactionBatch = createNewTransactionBatch(); @@ -481,8 +516,18 @@ private void beginNextTransaction() throws StreamingException { currentTransactionBatch.beginNextTransaction(); } - private TransactionBatch createNewTransactionBatch() throws StreamingException { - return new TransactionBatch(this); + private StreamingTransaction createNewTransactionBatch() throws StreamingException { + countTransactions++; + if (manageTransactions) { + return new TransactionBatch(this); + } else { + if (countTransactions > 1) { + throw new StreamingException("If a writeId is passed for the " + + "construction of HiveStreaming only one transaction batch" + + " can be done"); + } + return new UnManagedSingleTransaction(this); + } } private void checkClosedState() throws StreamingException { @@ -496,7 +541,7 @@ private void checkState() throws StreamingException { if (currentTransactionBatch == null) { throw new StreamingException("Transaction batch is null. Missing beginTransaction?"); } - if (currentTransactionBatch.state != TxnState.OPEN) { + if (currentTransactionBatch.getCurrentTransactionState() != TxnState.OPEN) { throw new StreamingException("Transaction state is not OPEN. Missing beginTransaction?"); } } @@ -504,13 +549,45 @@ private void checkState() throws StreamingException { @Override public void beginTransaction() throws StreamingException { checkClosedState(); + undeterminedPartitions = Lists.newArrayList(); beginNextTransaction(); } @Override public void commitTransaction() throws StreamingException { + commitTransactionWithPartition(null); + } + + @Override + public void commitTransactionWithPartition(Set partitions) + throws StreamingException { checkState(); - currentTransactionBatch.commit(); + + Set createdPartitions = new HashSet<>(); + if (partitions != null) { + for (String partition: partitions) { + try { + PartitionInfo info = createPartitionIfNotExists( + Warehouse.getPartValuesFromPartName(partition)); + switch (info.getExists()) { + case UNKNOWN: + throw new StreamingException("Couldn't determine if partition" + + " " + partition +" exists or not"); + case NOT_EXISTS: + createdPartitions.add(partition); + break; + default: + } + + } catch (MetaException e) { + throw new StreamingException("Partition " + partition + " is invalid.", e); + } + } + } + + currentTransactionBatch.commitWithPartitions(createdPartitions); + undeterminedPartitions.addAll( + currentTransactionBatch.getUnderminedPartitions()); connectionStats.incrementCommittedTransactions(); } @@ -549,8 +626,10 @@ public void close() { } catch (StreamingException e) { LOG.warn("Unable to close current transaction batch: " + currentTransactionBatch, e); } finally { - getMSC().close(); - getHeatbeatMSC().close(); + if (manageTransactions) { + getMSC().close(); + getHeatbeatMSC().close(); + } } if (LOG.isInfoEnabled()) { LOG.info("Closed streaming connection. Agent: {} Stats: {}", getAgentInfo(), getConnectionStats()); @@ -595,418 +674,6 @@ Long getCurrentTxnId() { return currentTransactionBatch.getCurrentTxnId(); } - private static class TransactionBatch { - private String username; - private HiveStreamingConnection conn; - private ScheduledExecutorService scheduledExecutorService; - private RecordWriter recordWriter; - private String partNameForLock = null; - private List txnToWriteIds; - private int currentTxnIndex = -1; - private TxnState state; - private LockRequest lockRequest = null; - // heartbeats can only be sent for open transactions. - // there is a race between committing/aborting a transaction and heartbeat. - // Example: If a heartbeat is sent for committed txn, exception will be thrown. - // Similarly if we don't send a heartbeat, metastore server might abort a txn - // for missed heartbeat right before commit txn call. - // This lock is used to mutex commit/abort and heartbeat calls - private final ReentrantLock transactionLock = new ReentrantLock(); - // min txn id is incremented linearly within a transaction batch. - // keeping minTxnId atomic as it is shared with heartbeat thread - private final AtomicLong minTxnId; - // max txn id does not change for a transaction batch - private final long maxTxnId; - - /** - * once any operation on this batch encounters a system exception - * (e.g. IOException on write) it's safest to assume that we can't write to the - * file backing this batch any more. This guards important public methods - */ - private final AtomicBoolean isTxnClosed = new AtomicBoolean(false); - private String agentInfo; - private int numTxns; - /** - * Tracks the state of each transaction - */ - private TxnState[] txnStatus; - /** - * ID of the last txn used by {@link #beginNextTransactionImpl()} - */ - private long lastTxnUsed; - - /** - * Represents a batch of transactions acquired from MetaStore - * - * @param conn - hive streaming connection - * @throws StreamingException if failed to create new RecordUpdater for batch - */ - private TransactionBatch(HiveStreamingConnection conn) throws StreamingException { - boolean success = false; - try { - if (conn.isPartitionedTable() && !conn.isDynamicPartitioning()) { - List partKeys = conn.tableObject.getPartitionKeys(); - partNameForLock = Warehouse.makePartName(partKeys, conn.staticPartitionValues); - } - this.conn = conn; - this.username = conn.username; - this.recordWriter = conn.recordWriter; - this.agentInfo = conn.agentInfo; - this.numTxns = conn.transactionBatchSize; - - setupHeartBeatThread(); - - List txnIds = openTxnImpl(username, numTxns); - txnToWriteIds = allocateWriteIdsImpl(txnIds); - assert (txnToWriteIds.size() == numTxns); - - txnStatus = new TxnState[numTxns]; - for (int i = 0; i < txnStatus.length; i++) { - assert (txnToWriteIds.get(i).getTxnId() == txnIds.get(i)); - txnStatus[i] = TxnState.OPEN; //Open matches Metastore state - } - this.state = TxnState.INACTIVE; - - // initialize record writer with connection and write id info - recordWriter.init(conn, txnToWriteIds.get(0).getWriteId(), txnToWriteIds.get(numTxns - 1).getWriteId()); - this.minTxnId = new AtomicLong(txnIds.get(0)); - this.maxTxnId = txnIds.get(txnIds.size() - 1); - success = true; - } catch (TException e) { - throw new StreamingException(conn.toString(), e); - } finally { - //clean up if above throws - markDead(success); - } - } - - private void setupHeartBeatThread() { - // start heartbeat thread - ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("HiveStreamingConnection-Heartbeat-Thread") - .build(); - this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory); - long heartBeatInterval; - long initialDelay; - try { - // if HIVE_TXN_TIMEOUT is defined, heartbeat interval will be HIVE_TXN_TIMEOUT/2 - heartBeatInterval = DbTxnManager.getHeartbeatInterval(conn.conf); - } catch (LockException e) { - heartBeatInterval = DEFAULT_HEARTBEAT_INTERVAL; - } - // to introduce some randomness and to avoid hammering the metastore at the same time (same logic as DbTxnManager) - initialDelay = (long) (heartBeatInterval * 0.75 * Math.random()); - LOG.info("Starting heartbeat thread with interval: {} ms initialDelay: {} ms for agentInfo: {}", - heartBeatInterval, initialDelay, conn.agentInfo); - Runnable runnable = new HeartbeatRunnable(conn, minTxnId, maxTxnId, transactionLock, isTxnClosed); - this.scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, heartBeatInterval, TimeUnit - .MILLISECONDS); - } - - private List openTxnImpl(final String user, final int numTxns) throws TException { - return conn.getMSC().openTxns(user, numTxns).getTxn_ids(); - } - - private List allocateWriteIdsImpl(final List txnIds) throws TException { - return conn.getMSC().allocateTableWriteIdsBatch(txnIds, conn.database, conn.table); - } - - @Override - public String toString() { - if (txnToWriteIds == null || txnToWriteIds.isEmpty()) { - return "{}"; - } - StringBuilder sb = new StringBuilder(" TxnStatus["); - for (TxnState state : txnStatus) { - //'state' should not be null - future proofing - sb.append(state == null ? "N" : state); - } - sb.append("] LastUsed ").append(JavaUtils.txnIdToString(lastTxnUsed)); - return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getTxnId() - + "/" + txnToWriteIds.get(0).getWriteId() - + "..." - + txnToWriteIds.get(txnToWriteIds.size() - 1).getTxnId() - + "/" + txnToWriteIds.get(txnToWriteIds.size() - 1).getWriteId() - + "] on connection = " + conn + "; " + sb; - } - - private void beginNextTransaction() throws StreamingException { - checkIsClosed(); - beginNextTransactionImpl(); - } - - private void beginNextTransactionImpl() throws TransactionError { - state = TxnState.INACTIVE;//clear state from previous txn - if ((currentTxnIndex + 1) >= txnToWriteIds.size()) { - throw new InvalidTransactionState("No more transactions available in" + - " next batch for connection: " + conn + " user: " + username); - } - currentTxnIndex++; - state = TxnState.OPEN; - lastTxnUsed = getCurrentTxnId(); - lockRequest = createLockRequest(conn, partNameForLock, username, getCurrentTxnId(), agentInfo); - try { - LockResponse res = conn.getMSC().lock(lockRequest); - if (res.getState() != LockState.ACQUIRED) { - throw new TransactionError("Unable to acquire lock on " + conn); - } - } catch (TException e) { - throw new TransactionError("Unable to acquire lock on " + conn, e); - } - } - - long getCurrentTxnId() { - if (currentTxnIndex >= 0) { - return txnToWriteIds.get(currentTxnIndex).getTxnId(); - } - return -1L; - } - - long getCurrentWriteId() { - if (currentTxnIndex >= 0) { - return txnToWriteIds.get(currentTxnIndex).getWriteId(); - } - return -1L; - } - - TxnState getCurrentTransactionState() { - return state; - } - - int remainingTransactions() { - if (currentTxnIndex >= 0) { - return txnToWriteIds.size() - currentTxnIndex - 1; - } - return txnToWriteIds.size(); - } - - - public void write(final byte[] record) throws StreamingException { - checkIsClosed(); - boolean success = false; - try { - recordWriter.write(getCurrentWriteId(), record); - success = true; - } catch (SerializationError ex) { - //this exception indicates that a {@code record} could not be parsed and the - //caller can decide whether to drop it or send it to dead letter queue. - //rolling back the txn and retrying won't help since the tuple will be exactly the same - //when it's replayed. - success = true; - throw ex; - } finally { - markDead(success); - } - } - - public void write(final InputStream inputStream) throws StreamingException { - checkIsClosed(); - boolean success = false; - try { - recordWriter.write(getCurrentWriteId(), inputStream); - success = true; - } catch (SerializationError ex) { - //this exception indicates that a {@code record} could not be parsed and the - //caller can decide whether to drop it or send it to dead letter queue. - //rolling back the txn and retrying won'table help since the tuple will be exactly the same - //when it's replayed. - success = true; - throw ex; - } finally { - markDead(success); - } - } - - private void checkIsClosed() throws StreamingException { - if (isTxnClosed.get()) { - throw new StreamingException("Transaction" + toString() + " is closed()"); - } - } - - /** - * A transaction batch opens a single HDFS file and writes multiple transaction to it. If there is any issue - * with the write, we can't continue to write to the same file any as it may be corrupted now (at the tail). - * This ensures that a client can't ignore these failures and continue to write. - */ - private void markDead(boolean success) throws StreamingException { - if (success) { - return; - } - close(); - } - - - void commit() throws StreamingException { - checkIsClosed(); - boolean success = false; - try { - commitImpl(); - success = true; - } finally { - markDead(success); - } - } - - private void commitImpl() throws StreamingException { - try { - recordWriter.flush(); - TxnToWriteId txnToWriteId = txnToWriteIds.get(currentTxnIndex); - if (conn.isDynamicPartitioning()) { - List partNames = new ArrayList<>(recordWriter.getPartitions()); - conn.getMSC().addDynamicPartitions(txnToWriteId.getTxnId(), txnToWriteId.getWriteId(), conn.database, conn.table, - partNames, DataOperationType.INSERT); - } - transactionLock.lock(); - try { - conn.getMSC().commitTxn(txnToWriteId.getTxnId()); - // increment the min txn id so that heartbeat thread will heartbeat only from the next open transaction. - // the current transaction is going to committed or fail, so don't need heartbeat for current transaction. - if (currentTxnIndex + 1 < txnToWriteIds.size()) { - minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId()); - } else { - // exhausted the batch, no longer have to heartbeat for current txn batch - minTxnId.set(-1); - } - } finally { - transactionLock.unlock(); - } - state = TxnState.COMMITTED; - txnStatus[currentTxnIndex] = TxnState.COMMITTED; - } catch (NoSuchTxnException e) { - throw new TransactionError("Invalid transaction id : " - + getCurrentTxnId(), e); - } catch (TxnAbortedException e) { - throw new TransactionError("Aborted transaction cannot be committed" - , e); - } catch (TException e) { - throw new TransactionError("Unable to commitTransaction transaction" - + getCurrentTxnId(), e); - } - } - - void abort() throws StreamingException { - if (isTxnClosed.get()) { - /* - * isDead is only set internally by this class. {@link #markDead(boolean)} will abort all - * remaining txns, so make this no-op to make sure that a well-behaved client that calls abortTransaction() - * error doesn't get misleading errors - */ - return; - } - abort(false); - } - - private void abort(final boolean abortAllRemaining) throws StreamingException { - abortImpl(abortAllRemaining); - } - - private void abortImpl(boolean abortAllRemaining) throws StreamingException { - if (minTxnId == null) { - return; - } - - transactionLock.lock(); - try { - if (abortAllRemaining) { - // we are aborting all txns in the current batch, so no need to heartbeat - minTxnId.set(-1); - //when last txn finished (abortTransaction/commitTransaction) the currentTxnIndex is pointing at that txn - //so we need to start from next one, if any. Also if batch was created but - //fetchTransactionBatch() was never called, we want to start with first txn - int minOpenTxnIndex = Math.max(currentTxnIndex + - (state == TxnState.ABORTED || state == TxnState.COMMITTED ? 1 : 0), 0); - for (currentTxnIndex = minOpenTxnIndex; - currentTxnIndex < txnToWriteIds.size(); currentTxnIndex++) { - conn.getMSC().rollbackTxn(txnToWriteIds.get(currentTxnIndex).getTxnId()); - txnStatus[currentTxnIndex] = TxnState.ABORTED; - } - currentTxnIndex--;//since the loop left it == txnToWriteIds.size() - } else { - // we are aborting only the current transaction, so move the min range for heartbeat or disable heartbeat - // if the current txn is last in the batch. - if (currentTxnIndex + 1 < txnToWriteIds.size()) { - minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId()); - } else { - // exhausted the batch, no longer have to heartbeat - minTxnId.set(-1); - } - long currTxnId = getCurrentTxnId(); - if (currTxnId > 0) { - conn.getMSC().rollbackTxn(currTxnId); - txnStatus[currentTxnIndex] = TxnState.ABORTED; - } - } - state = TxnState.ABORTED; - } catch (NoSuchTxnException e) { - throw new TransactionError("Unable to abort invalid transaction id : " - + getCurrentTxnId(), e); - } catch (TException e) { - throw new TransactionError("Unable to abort transaction id : " - + getCurrentTxnId(), e); - } finally { - transactionLock.unlock(); - } - } - - public boolean isClosed() { - return isTxnClosed.get(); - } - - /** - * Close the TransactionBatch. This will abort any still open txns in this batch. - * - * @throws StreamingException - failure when closing transaction batch - */ - public void close() throws StreamingException { - if (isTxnClosed.get()) { - return; - } - isTxnClosed.set(true); //also ensures that heartbeat() is no-op since client is likely doing it async - try { - abort(true);//abort all remaining txns - } catch (Exception ex) { - LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex); - throw new StreamingException("Unable to abort", ex); - } - try { - closeImpl(); - } catch (Exception ex) { - LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex); - throw new StreamingException("Unable to close", ex); - } - } - - private void closeImpl() throws StreamingException { - state = TxnState.INACTIVE; - recordWriter.close(); - if (scheduledExecutorService != null) { - scheduledExecutorService.shutdownNow(); - } - } - - static LockRequest createLockRequest(final HiveStreamingConnection connection, - String partNameForLock, String user, long txnId, String agentInfo) { - LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo); - requestBuilder.setUser(user); - requestBuilder.setTransactionId(txnId); - - LockComponentBuilder lockCompBuilder = new LockComponentBuilder() - .setDbName(connection.database) - .setTableName(connection.table) - .setShared() - .setOperationType(DataOperationType.INSERT); - if (connection.isDynamicPartitioning()) { - lockCompBuilder.setIsDynamicPartitionWrite(true); - } - if (partNameForLock != null && !partNameForLock.isEmpty()) { - lockCompBuilder.setPartitionName(partNameForLock); - } - requestBuilder.addLockComponent(lockCompBuilder.build()); - - return requestBuilder.build(); - } - } - private HiveConf createHiveConf(Class clazz, String metaStoreUri) { HiveConf conf = new HiveConf(clazz); if (metaStoreUri != null) { @@ -1049,6 +716,13 @@ private static void setHiveConf(HiveConf conf, String var) { conf.setBoolean(var, true); } + public List getTxnToWriteIds() { + if (currentTransactionBatch != null) { + return currentTransactionBatch.getTxnToWriteIds(); + } + return null; + } + @Override public HiveConf getHiveConf() { return conf; @@ -1083,4 +757,41 @@ public boolean isPartitionedTable() { public boolean isDynamicPartitioning() { return isPartitionedTable() && (staticPartitionValues == null || staticPartitionValues.isEmpty()); } + + @Override + public List getPartitions() { + return undeterminedPartitions; + } + + public String getUsername() { + return username; + } + + public String getDatabase() { + return database; + } + + public RecordWriter getRecordWriter() { + return recordWriter; + } + + public int getTransactionBatchSize() { + return transactionBatchSize; + } + + public HiveConf getConf() { + return conf; + } + + public Long getWriteId() { + return writeId; + } + + public Integer getStatementId() { + return statementId; + } + + public Long getCurrentWriteId() { + return currentTransactionBatch.getCurrentWriteId(); + } } diff --git a/streaming/src/java/org/apache/hive/streaming/PartitionInfo.java b/streaming/src/java/org/apache/hive/streaming/PartitionInfo.java index ce9f76a6b0..02f6a42471 100644 --- a/streaming/src/java/org/apache/hive/streaming/PartitionInfo.java +++ b/streaming/src/java/org/apache/hive/streaming/PartitionInfo.java @@ -24,9 +24,18 @@ public class PartitionInfo { private String name; private String partitionLocation; - private boolean exists; + private Exists exists; - public PartitionInfo(final String name, final String partitionLocation, final boolean exists) { + /** + * Holds whether the partition exists, doesn't exist or we don't know. + */ + public enum Exists { + EXISTS, + NOT_EXISTS, + UNKNOWN + } + + public PartitionInfo(final String name, final String partitionLocation, final Exists exists) { this.name = name; this.partitionLocation = partitionLocation; this.exists = exists; @@ -48,11 +57,11 @@ public void setPartitionLocation(final String partitionLocation) { this.partitionLocation = partitionLocation; } - public boolean isExists() { + public Exists getExists() { return exists; } - public void setExists(final boolean exists) { + public void setExists(final Exists exists) { this.exists = exists; } } diff --git a/streaming/src/java/org/apache/hive/streaming/RecordWriter.java b/streaming/src/java/org/apache/hive/streaming/RecordWriter.java index d9c4455e61..0c8aabe543 100644 --- a/streaming/src/java/org/apache/hive/streaming/RecordWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/RecordWriter.java @@ -19,8 +19,12 @@ package org.apache.hive.streaming; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.metadata.Table; + import java.io.InputStream; +import java.util.List; import java.util.Set; public interface RecordWriter { @@ -35,6 +39,20 @@ */ void init(StreamingConnection connection, long minWriteId, long maxWriteID) throws StreamingException; + /** + * Initialize record writer. + * + * @param connection - streaming connection + * @param minWriteId - min write id + * @param maxWriteID - max write id + * @param statementId - statemenId. Note this number can't be bigger than 2^12 + * @throws StreamingException - thrown when initialization failed + */ + default void init(StreamingConnection connection, long minWriteId, + long maxWriteID, int statementId) throws StreamingException { + init(connection, minWriteId, maxWriteID); + } + /** * Writes using a hive RecordUpdater. * @@ -74,4 +92,27 @@ * @return - set of partitions */ Set getPartitions(); + + /** + * Get the set of partitions that were use but that it wasn't possible + * to determine if they were used or not. + * + * @return - set of partitions + */ + Set getUndeterminedPartitions(); + + /** + * Returns the location of the delta directory. + * @param partitionValues partition values + * @param bucketId bucket id + * @param minWriteId min write Id + * @param maxWriteId max write Id + * @param statementId statement Id + * @param table table + * @return the location of the file + * @throws StreamingException when the path is not found + */ + Path getDeltaFileLocation(List partitionValues, + Integer bucketId, Long minWriteId, Long maxWriteId, Integer statementId, + Table table) throws StreamingException; } diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java index fbe00db1f7..dc6c77d06c 100644 --- a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java @@ -19,9 +19,14 @@ package org.apache.hive.streaming; import java.io.InputStream; +import java.util.List; +import java.util.Set; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import javax.annotation.Nullable; + public interface StreamingConnection extends ConnectionInfo, PartitionHandler { /** * Returns hive configuration object used during connection creation. @@ -60,6 +65,16 @@ */ void commitTransaction() throws StreamingException; + /** + * Commit a transaction to make the writes visible for readers. Include + * other partitions that may have been added independently. + * + * @param partitions - extra partitions to commit. + * @throws StreamingException - if there are errors when committing the open transaction. + */ + void commitTransactionWithPartition(@Nullable Set partitions) + throws StreamingException; + /** * Manually abort the opened transaction. * @@ -78,4 +93,26 @@ * @return - connection stats */ ConnectionStats getConnectionStats(); + + /** + * Get the partitions used during the streaming. This partitions haven't + * been committed to the metastore. + * @return partitions. + */ + List getPartitions(); + + /** + * Returns the file that would be used by the writer to write the rows. + * given the parameters + * @param partitionValues partition values + * @param bucketId bucket id + * @param minWriteId min write Id + * @param maxWriteId max write Id + * @param statementId statement Id + * @return the location of the file. + * @throws StreamingException when the path is not found + */ + Path getDeltaFileLocation(List partitionValues, + Integer bucketId, Long minWriteId, Long maxWriteId, Integer statementId) + throws StreamingException; } diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java b/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java new file mode 100644 index 0000000000..0777fee0b8 --- /dev/null +++ b/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.metastore.LockComponentBuilder; +import org.apache.hadoop.hive.metastore.LockRequestBuilder; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.DataOperationType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.hive.streaming.transaction.TransactionError; +import org.apache.hive.streaming.transaction.TxnState; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Streaming transaction to use most of the times. Will query the + * metastore to get the transaction ids and the writer ids and then + * will commit them. + */ +public class TransactionBatch extends AbstractStreamingTransaction { + private static final Logger LOG = LoggerFactory.getLogger( + TransactionBatch.class.getName()); + private static final int DEFAULT_HEARTBEAT_INTERVAL = 60 * 1000; + protected List createdPartitions = null; + private String username; + private HiveStreamingConnection conn; + private ScheduledExecutorService scheduledExecutorService; + private String partNameForLock = null; + private LockRequest lockRequest = null; + // heartbeats can only be sent for open transactions. + // there is a race between committing/aborting a transaction and heartbeat. + // Example: If a heartbeat is sent for committed txn, exception will be thrown. + // Similarly if we don't send a heartbeat, metastore server might abort a txn + // for missed heartbeat right before commit txn call. + // This lock is used to mutex commit/abort and heartbeat calls + private final ReentrantLock transactionLock = new ReentrantLock(); + // min txn id is incremented linearly within a transaction batch. + // keeping minTxnId atomic as it is shared with heartbeat thread + private final AtomicLong minTxnId; + // max txn id does not change for a transaction batch + private final long maxTxnId; + + private String agentInfo; + private int numTxns; + /** + * Tracks the state of each transaction. + */ + private TxnState[] txnStatus; + /** + * ID of the last txn used by {@link #beginNextTransactionImpl()}. + */ + private long lastTxnUsed; + + /** + * Represents a batch of transactions acquired from MetaStore. + * + * @param conn - hive streaming connection + * @throws StreamingException if failed to create new RecordUpdater for batch + */ + public TransactionBatch(HiveStreamingConnection conn) throws StreamingException { + boolean success = false; + try { + if (conn.isPartitionedTable() && !conn.isDynamicPartitioning()) { + List partKeys = conn.getTable().getPartitionKeys(); + partNameForLock = Warehouse.makePartName(partKeys, conn.getStaticPartitionValues()); + } + this.conn = conn; + this.username = conn.getUsername(); + this.recordWriter = conn.getRecordWriter(); + this.agentInfo = conn.getAgentInfo(); + this.numTxns = conn.getTransactionBatchSize(); + + setupHeartBeatThread(); + + List txnIds = openTxnImpl(username, numTxns); + txnToWriteIds = allocateWriteIdsImpl(txnIds); + assert (txnToWriteIds.size() == numTxns); + + txnStatus = new TxnState[numTxns]; + for (int i = 0; i < txnStatus.length; i++) { + assert (txnToWriteIds.get(i).getTxnId() == txnIds.get(i)); + txnStatus[i] = TxnState.OPEN; //Open matches Metastore state + } + this.state = TxnState.INACTIVE; + + // initialize record writer with connection and write id info + recordWriter.init(conn, txnToWriteIds.get(0).getWriteId(), + txnToWriteIds.get(numTxns - 1).getWriteId(), conn.getStatementId()); + this.minTxnId = new AtomicLong(txnIds.get(0)); + this.maxTxnId = txnIds.get(txnIds.size() - 1); + success = true; + } catch (TException e) { + throw new StreamingException(conn.toString(), e); + } finally { + //clean up if above throws + markDead(success); + } + } + + private static class HeartbeatRunnable implements Runnable { + private final HiveStreamingConnection conn; + private final AtomicLong minTxnId; + private final long maxTxnId; + private final ReentrantLock transactionLock; + private final AtomicBoolean isTxnClosed; + + HeartbeatRunnable(final HiveStreamingConnection conn, final AtomicLong minTxnId, final long maxTxnId, + final ReentrantLock transactionLock, final AtomicBoolean isTxnClosed) { + this.conn = conn; + this.minTxnId = minTxnId; + this.maxTxnId = maxTxnId; + this.transactionLock = transactionLock; + this.isTxnClosed = isTxnClosed; + } + + @Override + public void run() { + transactionLock.lock(); + try { + if (minTxnId.get() > 0) { + HeartbeatTxnRangeResponse resp = conn.getHeatbeatMSC().heartbeatTxnRange(minTxnId.get(), maxTxnId); + if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) { + LOG.error("Heartbeat failure: {}", resp.toString()); + isTxnClosed.set(true); + } else { + LOG.info("Heartbeat sent for range: [{}-{}]", minTxnId.get(), maxTxnId); + } + } + } catch (TException e) { + LOG.warn("Failure to heartbeat for transaction range: [" + minTxnId.get() + "-" + maxTxnId + "]", e); + } finally { + transactionLock.unlock(); + } + } + } + + private void setupHeartBeatThread() { + // start heartbeat thread + ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("HiveStreamingConnection-Heartbeat-Thread") + .build(); + this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory); + long heartBeatInterval; + long initialDelay; + try { + // if HIVE_TXN_TIMEOUT is defined, heartbeat interval will be HIVE_TXN_TIMEOUT/2 + heartBeatInterval = DbTxnManager.getHeartbeatInterval(conn.getConf()); + } catch (LockException e) { + heartBeatInterval = DEFAULT_HEARTBEAT_INTERVAL; + } + // to introduce some randomness and to avoid hammering the metastore at the same time (same logic as DbTxnManager) + initialDelay = (long) (heartBeatInterval * 0.75 * Math.random()); + LOG.info("Starting heartbeat thread with interval: {} ms initialDelay: {} ms for agentInfo: {}", + heartBeatInterval, initialDelay, conn.getAgentInfo()); + Runnable runnable = new HeartbeatRunnable(conn, minTxnId, maxTxnId, transactionLock, isTxnClosed); + this.scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, heartBeatInterval, TimeUnit + .MILLISECONDS); + } + + private List openTxnImpl(final String user, final int numTxns) throws TException { + return conn.getMSC().openTxns(user, numTxns).getTxn_ids(); + } + + private List allocateWriteIdsImpl(final List txnIds) throws TException { + return conn.getMSC().allocateTableWriteIdsBatch(txnIds, conn.getDatabase(), + conn.getTable().getTableName()); + } + + @Override + public String toString() { + if (txnToWriteIds == null || txnToWriteIds.isEmpty()) { + return "{}"; + } + StringBuilder sb = new StringBuilder(" TxnStatus["); + for (TxnState state : txnStatus) { + //'state' should not be null - future proofing + sb.append(state == null ? "N" : state); + } + sb.append("] LastUsed ").append(JavaUtils.txnIdToString(lastTxnUsed)); + return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getTxnId() + + "/" + txnToWriteIds.get(0).getWriteId() + + "..." + + txnToWriteIds.get(txnToWriteIds.size() - 1).getTxnId() + + "/" + txnToWriteIds.get(txnToWriteIds.size() - 1).getWriteId() + + "] on connection = " + conn + "; " + sb; + } + + public void beginNextTransaction() throws StreamingException { + checkIsClosed(); + beginNextTransactionImpl(); + } + + private void beginNextTransactionImpl() throws StreamingException { + beginNextTransactionImpl("No more transactions available in" + + " next batch for connection: " + conn + " user: " + username); + lastTxnUsed = getCurrentTxnId(); + lockRequest = createLockRequest(conn, partNameForLock, username, getCurrentTxnId(), agentInfo); + createdPartitions = Lists.newArrayList(); + try { + LockResponse res = conn.getMSC().lock(lockRequest); + if (res.getState() != LockState.ACQUIRED) { + throw new TransactionError("Unable to acquire lock on " + conn); + } + } catch (TException e) { + throw new TransactionError("Unable to acquire lock on " + conn, e); + } + } + + public void commitWithPartitions(Set partitions) throws StreamingException { + checkIsClosed(); + boolean success = false; + try { + commitImpl(partitions); + success = true; + } finally { + markDead(success); + } + } + + private void commitImpl(Set partitions) throws StreamingException { + try { + recordWriter.flush(); + TxnToWriteId txnToWriteId = txnToWriteIds.get(currentTxnIndex); + if (conn.isDynamicPartitioning()) { + List partNames = new ArrayList<>(recordWriter.getPartitions()); + createdPartitions.addAll(partNames); + if (partitions != null) { + partNames.addAll(partitions); + } + if (!partNames.isEmpty()) { + conn.getMSC().addDynamicPartitions(txnToWriteId.getTxnId(), + txnToWriteId.getWriteId(), conn.getDatabase(), + conn.getTable().getTableName(), partNames, + DataOperationType.INSERT); + } + } + transactionLock.lock(); + try { + conn.getMSC().commitTxn(txnToWriteId.getTxnId()); + // increment the min txn id so that heartbeat thread will heartbeat only from the next open transaction. + // the current transaction is going to committed or fail, so don't need heartbeat for current transaction. + if (currentTxnIndex + 1 < txnToWriteIds.size()) { + minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId()); + } else { + // exhausted the batch, no longer have to heartbeat for current txn batch + minTxnId.set(-1); + } + } finally { + transactionLock.unlock(); + } + state = TxnState.COMMITTED; + txnStatus[currentTxnIndex] = TxnState.COMMITTED; + } catch (NoSuchTxnException e) { + throw new TransactionError("Invalid transaction id : " + + getCurrentTxnId(), e); + } catch (TxnAbortedException e) { + throw new TransactionError("Aborted transaction " + + "cannot be committed", e); + } catch (TException e) { + throw new TransactionError("Unable to commitTransaction transaction" + + getCurrentTxnId(), e); + } + } + + public void abort() throws StreamingException { + if (isTxnClosed.get()) { + /* + * isDead is only set internally by this class. {@link #markDead(boolean)} will abort all + * remaining txns, so make this no-op to make sure that a well-behaved client that calls abortTransaction() + * error doesn't get misleading errors + */ + return; + } + abort(false); + } + + private void abort(final boolean abortAllRemaining) throws StreamingException { + abortImpl(abortAllRemaining); + } + + private void abortImpl(boolean abortAllRemaining) throws StreamingException { + if (minTxnId == null) { + return; + } + + transactionLock.lock(); + try { + if (abortAllRemaining) { + // we are aborting all txns in the current batch, so no need to heartbeat + minTxnId.set(-1); + //when last txn finished (abortTransaction/commitTransaction) the currentTxnIndex is pointing at that txn + //so we need to start from next one, if any. Also if batch was created but + //fetchTransactionBatch() was never called, we want to start with first txn + int minOpenTxnIndex = Math.max(currentTxnIndex + + (state == TxnState.ABORTED || state == TxnState.COMMITTED ? 1 : 0), 0); + for (currentTxnIndex = minOpenTxnIndex; + currentTxnIndex < txnToWriteIds.size(); currentTxnIndex++) { + conn.getMSC().rollbackTxn(txnToWriteIds.get(currentTxnIndex).getTxnId()); + txnStatus[currentTxnIndex] = TxnState.ABORTED; + } + currentTxnIndex--; //since the loop left it == txnToWriteIds.size() + } else { + // we are aborting only the current transaction, so move the min range for heartbeat or disable heartbeat + // if the current txn is last in the batch. + if (currentTxnIndex + 1 < txnToWriteIds.size()) { + minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId()); + } else { + // exhausted the batch, no longer have to heartbeat + minTxnId.set(-1); + } + long currTxnId = getCurrentTxnId(); + if (currTxnId > 0) { + conn.getMSC().rollbackTxn(currTxnId); + txnStatus[currentTxnIndex] = TxnState.ABORTED; + } + } + state = TxnState.ABORTED; + } catch (NoSuchTxnException e) { + throw new TransactionError("Unable to abort invalid transaction id : " + + getCurrentTxnId(), e); + } catch (TException e) { + throw new TransactionError("Unable to abort transaction id : " + + getCurrentTxnId(), e); + } finally { + transactionLock.unlock(); + } + } + + /** + * Close the TransactionBatch. This will abort any still open txns in this batch. + * + * @throws StreamingException - failure when closing transaction batch + */ + public void close() throws StreamingException { + if (isClosed()) { + return; + } + isTxnClosed.set(true); //also ensures that heartbeat() is no-op since client is likely doing it async + try { + abort(true); //abort all remaining txns + } catch (Exception ex) { + LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex); + throw new StreamingException("Unable to abort", ex); + } + try { + closeImpl(); + } catch (Exception ex) { + LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex); + throw new StreamingException("Unable to close", ex); + } + } + + private void closeImpl() throws StreamingException { + state = TxnState.INACTIVE; + recordWriter.close(); + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdownNow(); + } + } + + private static LockRequest createLockRequest(final HiveStreamingConnection connection, + String partNameForLock, String user, long txnId, String agentInfo) { + LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo); + requestBuilder.setUser(user); + requestBuilder.setTransactionId(txnId); + + LockComponentBuilder lockCompBuilder = new LockComponentBuilder() + .setDbName(connection.getDatabase()) + .setTableName(connection.getTable().getTableName()) + .setShared() + .setOperationType(DataOperationType.INSERT); + if (connection.isDynamicPartitioning()) { + lockCompBuilder.setIsDynamicPartitionWrite(true); + } + if (partNameForLock != null && !partNameForLock.isEmpty()) { + lockCompBuilder.setPartitionName(partNameForLock); + } + requestBuilder.addLockComponent(lockCompBuilder.build()); + + return requestBuilder.build(); + } + + /** + * @return the list of created partitions. + */ + @Override + public List getCreatedPartitions() { + return createdPartitions; + } + + /** + * This class has a connection to the metastore so it always creates + * the partitions that is going to use. + * @return empty list + */ + @Override + public List getUnderminedPartitions() { + return Collections.emptyList(); + } +} diff --git a/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java b/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java new file mode 100644 index 0000000000..f1cd454f12 --- /dev/null +++ b/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.apache.hive.streaming.transaction.TxnState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +/** + * Receives a single writeId. Doesn't open connections to the metastore + * so the commit has to be done externally by the entity that created + * the writeId. + */ +public class UnManagedSingleTransaction extends AbstractStreamingTransaction { + private static final Logger LOG = LoggerFactory.getLogger( + UnManagedSingleTransaction.class.getName()); + private final String username; + private final HiveStreamingConnection conn; + private final List undeterminedPartitions = Lists.newArrayList(); + + public UnManagedSingleTransaction(HiveStreamingConnection conn) + throws StreamingException{ + assert conn.getWriteId() != null; + + this.conn = conn; + this.username = conn.getUsername(); + this.recordWriter = conn.getRecordWriter(); + this.state = TxnState.INACTIVE; + + txnToWriteIds = Lists.newArrayList(new TxnToWriteId(-1, + conn.getWriteId())); + + boolean success = false; + try { + recordWriter.init(conn, txnToWriteIds.get(0).getWriteId(), + txnToWriteIds.get(0).getWriteId(), conn.getStatementId()); + success = true; + } finally { + markDead(success); + } + } + + @Override + public void beginNextTransaction() throws StreamingException { + beginNextTransactionImpl("No more transactions available in" + + " next batch for connection: " + conn + " user: " + username); + } + + @Override + public void commitWithPartitions(Set partitions) throws StreamingException { + checkIsClosed(); + boolean success = false; + try { + commitImpl(); + success = true; + } finally { + markDead(success); + } + } + + private void commitImpl() throws StreamingException { + recordWriter.flush(); + List partNames = new ArrayList<>(recordWriter.getUndeterminedPartitions()); + undeterminedPartitions.addAll(partNames); + state = TxnState.PREPARED_FOR_COMMIT; + } + + @Override + public void abort() { + if (isTxnClosed.get()) { + return; + } + state = TxnState.ABORTED; + } + + @Override + public void close() throws StreamingException { + if (isClosed()) { + return; + } + isTxnClosed.set(true); + abort(); + try { + closeImpl(); + } catch (Exception ex) { + LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex); + throw new StreamingException("Unable to close", ex); + } + } + + private void closeImpl() throws StreamingException { + state = TxnState.INACTIVE; + recordWriter.close(); + } + + @Override + public String toString() { + if (txnToWriteIds == null || txnToWriteIds.isEmpty()) { + return "{}"; + } + return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getWriteId() + + "] on connection = " + conn + "; " + "status=" + state; + } + + /** + * @return This class doesn't have a connection to the metastore so it won't + * create any partition + */ + @Override + public List getCreatedPartitions() { + return Collections.emptyList(); + } + + /** + * @return partitions used by the writer from this class. + */ + @Override + public List getUnderminedPartitions() { + return undeterminedPartitions; + } +} diff --git a/streaming/src/java/org/apache/hive/streaming/package.html b/streaming/src/java/org/apache/hive/streaming/package.html index 2b45792807..5bfe04ce44 100644 --- a/streaming/src/java/org/apache/hive/streaming/package.html +++ b/streaming/src/java/org/apache/hive/streaming/package.html @@ -43,7 +43,8 @@

HCatalog Streaming API -- high level description

Note on packaging: The APIs are defined in the -org.apache.hive.streaming Java package and included as +org.apache.hive.streaming and +org.apache.hive.streaming. transaction Java packages and included as the hive-streaming jar.

STREAMING REQUIREMENTS

diff --git a/streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java b/streaming/src/java/org/apache/hive/streaming/transaction/InvalidTransactionState.java similarity index 87% rename from streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java rename to streaming/src/java/org/apache/hive/streaming/transaction/InvalidTransactionState.java index 9d92dfa516..67a2d25ea8 100644 --- a/streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java +++ b/streaming/src/java/org/apache/hive/streaming/transaction/InvalidTransactionState.java @@ -16,10 +16,13 @@ * limitations under the License. */ -package org.apache.hive.streaming; +package org.apache.hive.streaming.transaction; +/** + * Invalid transaction. + */ public class InvalidTransactionState extends TransactionError { - InvalidTransactionState(String msg) { + public InvalidTransactionState(String msg) { super(msg); } } diff --git a/streaming/src/java/org/apache/hive/streaming/transaction/StreamingTransaction.java b/streaming/src/java/org/apache/hive/streaming/transaction/StreamingTransaction.java new file mode 100644 index 0000000000..7bf624049c --- /dev/null +++ b/streaming/src/java/org/apache/hive/streaming/transaction/StreamingTransaction.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming.transaction; + +import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.apache.hive.streaming.StreamingException; +import java.io.InputStream; +import java.util.List; +import java.util.Set; + +/** + * Common interface for transaction in HiveStreamingConnection. + */ +public interface StreamingTransaction { + void beginNextTransaction() throws StreamingException; + void commit() throws StreamingException; + void commitWithPartitions(Set partitions) throws StreamingException; + void abort() throws StreamingException; + void write(byte[] record) throws StreamingException; + void write(InputStream record) throws StreamingException; + void close() throws StreamingException; + + boolean isClosed(); + TxnState getCurrentTransactionState(); + int remainingTransactions(); + long getCurrentTxnId(); + long getCurrentWriteId(); + + /** + * Get the partitions that were created in this transaction. + * @return list of partitions + */ + List getCreatedPartitions(); + + /** + * Get the partitions that were used but we don't know if they need to + * be created or not. They may exist. + * @return list of partitions + */ + List getUnderminedPartitions(); + List getTxnToWriteIds(); +} diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionError.java b/streaming/src/java/org/apache/hive/streaming/transaction/TransactionError.java similarity index 81% rename from streaming/src/java/org/apache/hive/streaming/TransactionError.java rename to streaming/src/java/org/apache/hive/streaming/transaction/TransactionError.java index ae56e7c726..98dc613b3e 100644 --- a/streaming/src/java/org/apache/hive/streaming/TransactionError.java +++ b/streaming/src/java/org/apache/hive/streaming/transaction/TransactionError.java @@ -16,14 +16,19 @@ * limitations under the License. */ -package org.apache.hive.streaming; +package org.apache.hive.streaming.transaction; +import org.apache.hive.streaming.StreamingException; + +/** + * Transaction error. + */ public class TransactionError extends StreamingException { - TransactionError(String msg, Exception e) { + public TransactionError(String msg, Exception e) { super(msg + (e == null ? "" : ": " + e.getMessage()), e); } - TransactionError(String msg) { + public TransactionError(String msg) { super(msg); } } diff --git a/streaming/src/java/org/apache/hive/streaming/transaction/TxnState.java b/streaming/src/java/org/apache/hive/streaming/transaction/TxnState.java new file mode 100644 index 0000000000..7b12d116c4 --- /dev/null +++ b/streaming/src/java/org/apache/hive/streaming/transaction/TxnState.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming.transaction; + +/** + * States for a transaction. + */ +public enum TxnState { + INACTIVE("I"), OPEN("O"), COMMITTED("C"), ABORTED("A"), + PREPARED_FOR_COMMIT("P"); + + private final String code; + + TxnState(String code) { + this.code = code; + } + + public String toString() { + return code; + } +} diff --git a/streaming/src/java/org/apache/hive/streaming/transaction/package-info.java b/streaming/src/java/org/apache/hive/streaming/transaction/package-info.java new file mode 100644 index 0000000000..3c830120ef --- /dev/null +++ b/streaming/src/java/org/apache/hive/streaming/transaction/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package grouping transaction classes. + */ +package org.apache.hive.streaming.transaction; diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java index 8b5e508d94..e8b52a61d2 100644 --- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java +++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java @@ -35,6 +35,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -60,13 +61,13 @@ import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.api.TxnInfo; -import org.apache.hadoop.hive.metastore.api.TxnState; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; @@ -82,6 +83,7 @@ import org.apache.hadoop.hive.ql.io.orc.OrcStruct; import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.ql.io.orc.RecordReader; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.txn.compactor.Worker; @@ -96,6 +98,8 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; +import org.apache.hive.streaming.transaction.TransactionError; +import org.apache.hive.streaming.transaction.TxnState; import org.apache.orc.impl.OrcAcidUtils; import org.apache.orc.tools.FileDump; import org.apache.thrift.TException; @@ -150,7 +154,8 @@ public FileStatus getFileStatus(Path path) throws IOException { if (file.canExecute()) { mod |= 0111; } - return new FileStatus(file.length(), file.isDirectory(), 1, 1024, + return new FileStatus(file.length(), file.isDirectory(), + 1, 1024, file.lastModified(), file.lastModified(), FsPermission.createImmutable(mod), "owen", "users", path); } @@ -418,6 +423,123 @@ public void testNoBuckets() throws Exception { Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000005/bucket_00000")); } + @Test + public void testGetDeltaPath() throws Exception { + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + Path path = connection.getDeltaFileLocation(partitionVals, 0, + 5L, 5L, 9); + Assert.assertTrue(path.toString().endsWith("testing.db/alerts/continent" + + "=Asia/country=India/delta_0000005_0000005_0009/bucket_00000")); + } + + @Test + public void testConnectionWithWriteId() throws Exception { + queryTable(driver, "drop table if exists default.writeidconnection"); + queryTable(driver, "create table default.writeidconnection (a string, b string) stored as orc " + + "TBLPROPERTIES('transactional'='true')"); + queryTable(driver, "insert into default.writeidconnection values('a0','bar')"); + + List rs = queryTable(driver, "select * from default.writeidconnection"); + Assert.assertEquals(1, rs.size()); + Assert.assertEquals("a0\tbar", rs.get(0)); + + StrictDelimitedInputWriter writerT = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection transactionConnection = HiveStreamingConnection.newBuilder() + .withDatabase("Default") + .withTable("writeidconnection") + .withRecordWriter(writerT) + .withHiveConf(conf) + .connect(); + transactionConnection.beginTransaction(); + + Table tObject = transactionConnection.getTable(); + Long writeId = transactionConnection.getCurrentWriteId(); + + Assert.assertNotNull(tObject); + Assert.assertNotNull(writeId); + + StrictDelimitedInputWriter writerOne = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connectionOne = HiveStreamingConnection.newBuilder() + .withDatabase("Default") + .withTable("writeidconnection") + .withRecordWriter(writerOne) + .withHiveConf(conf) + .withWriteId(writeId) + .withStatementId(1) + .withTableObject(tObject) + .connect(); + + StrictDelimitedInputWriter writerTwo = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connectionTwo = HiveStreamingConnection.newBuilder() + .withDatabase("Default") + .withRecordWriter(writerTwo) + .withHiveConf(conf) + .withWriteId(writeId) + .withStatementId(2) + .withTableObject(tObject) + .connect(); + + Assert.assertNotNull(connectionOne); + Assert.assertNotNull(connectionTwo); + + connectionOne.beginTransaction(); + connectionTwo.beginTransaction(); + connectionOne.write("a1,b2".getBytes()); + connectionTwo.write("a5,b6".getBytes()); + connectionOne.write("a3,b4".getBytes()); + connectionOne.commitTransaction(); + connectionTwo.commitTransaction(); + + Assert.assertEquals(TxnState.PREPARED_FOR_COMMIT, + connectionOne.getCurrentTransactionState()); + Assert.assertEquals(TxnState.PREPARED_FOR_COMMIT, + connectionTwo.getCurrentTransactionState()); + + try { + connectionOne.beginTransaction(); + Assert.fail("second beginTransaction should have thrown a " + + "StreamingException"); + } catch (StreamingException e) { + + } + + connectionOne.close(); + connectionTwo.close(); + + rs = queryTable(driver, "select ROW__ID, a, b, " + + "INPUT__FILE__NAME from default.writeidconnection order by ROW__ID"); + // Nothing here since it hasn't been committed + Assert.assertEquals(1, rs.size()); + + transactionConnection.commitTransaction(); + + rs = queryTable(driver, "select ROW__ID, a, b, " + + "INPUT__FILE__NAME from default.writeidconnection order by a"); + Assert.assertEquals(4, rs.size()); + Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\ta0\tbar")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("bucket_00000")); + Assert.assertTrue(rs.get(1), rs.get(1).contains("\"rowid\":0}\ta1\tb2")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("bucket_00000")); + Assert.assertTrue(rs.get(2), rs.get(2).contains("\"rowid\":1}\ta3\tb4")); + Assert.assertTrue(rs.get(2), rs.get(2).endsWith("bucket_00000")); + Assert.assertTrue(rs.get(3), rs.get(3).contains("\ta5\tb6")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("bucket_00000")); + } + @Test public void testAllTypesDelimitedWriter() throws Exception { queryTable(driver, "drop table if exists default.alltypes"); @@ -956,11 +1078,151 @@ public void testAddPartition() throws Exception { // Create partition Assert.assertNotNull(connection); + connection.beginTransaction(); + connection.write("3,Hello streaming - once again".getBytes()); + connection.commitTransaction(); + + // Ensure partition is present + Partition p = msClient.getPartition(dbName, tblName, newPartVals); + Assert.assertNotNull("Did not find added partition", p); + } + + @Test + public void testAddPartitionWithWriteId() throws Exception { + List newPartVals = new ArrayList(2); + newPartVals.add("WriteId_continent"); + newPartVals.add("WriteId_country"); + + StrictDelimitedInputWriter writerT = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection transactionConnection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(newPartVals) + .withRecordWriter(writerT) + .withHiveConf(conf) + .connect(); + transactionConnection.beginTransaction(); + + Table tObject = transactionConnection.getTable(); + Long writeId = transactionConnection.getCurrentWriteId(); + + Assert.assertNotNull(tObject); + Assert.assertNotNull(writeId); + + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(newPartVals) + .withRecordWriter(writer) + .withHiveConf(conf) + .withWriteId(writeId) + .withStatementId(1) + .withTableObject(tObject) + .connect(); + + Assert.assertNotNull(connection); + + connection.beginTransaction(); + connection.write("3,Hello streaming - once again".getBytes()); + connection.commitTransaction(); + + Set partitions = new HashSet<>(connection.getPartitions()); + + connection.close(); + + // Ensure partition is not present + try { + msClient.getPartition(dbName, tblName, newPartVals); + Assert.fail("Partition shouldn't exist so a NoSuchObjectException should have been raised"); + } catch (NoSuchObjectException e) {} + + transactionConnection.commitTransactionWithPartition(partitions); + // Ensure partition is present - Partition p = msClient.getPartition(dbName, tblName, partitionVals); + Partition p = msClient.getPartition(dbName, tblName, newPartVals); Assert.assertNotNull("Did not find added partition", p); } + @Test + public void testAddDynamicPartitionWithWriteId() throws Exception { + queryTable(driver, "drop table if exists default.writeiddynamic"); + queryTable(driver, "create table default.writeiddynamic (a" + + " string, b string) partitioned by (c string, d string)" + + " stored as orc TBLPROPERTIES('transactional'='true')"); + + StrictDelimitedInputWriter writerT = + StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build(); + HiveStreamingConnection transactionConnection = + HiveStreamingConnection.newBuilder().withDatabase("default").withTable("writeiddynamic").withRecordWriter(writerT) + .withHiveConf(conf).connect(); + transactionConnection.beginTransaction(); + + Table tObject = transactionConnection.getTable(); + Long writeId = transactionConnection.getCurrentWriteId(); + + Assert.assertNotNull(tObject); + Assert.assertNotNull(writeId); + + StrictDelimitedInputWriter writerOne = + StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build(); + HiveStreamingConnection connectionOne = + HiveStreamingConnection.newBuilder().withDatabase("default").withTable("writeiddynamic").withRecordWriter(writerOne) + .withHiveConf(conf).withWriteId(writeId).withStatementId(1).withTableObject(tObject).connect(); + + StrictDelimitedInputWriter writerTwo = + StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build(); + HiveStreamingConnection connectionTwo = + HiveStreamingConnection.newBuilder().withDatabase("default").withTable("writeiddynamic").withRecordWriter(writerTwo) + .withHiveConf(conf).withWriteId(writeId).withStatementId(1).withTableObject(tObject).connect(); + + Assert.assertNotNull(connectionOne); + + connectionTwo.beginTransaction(); + connectionOne.beginTransaction(); + connectionOne.write("1,2,3,4".getBytes()); + connectionOne.write("1,2,5,6".getBytes()); + connectionTwo.write("1,2,30,40".getBytes()); + connectionOne.write("1,2,7,8".getBytes()); + connectionTwo.write("1,2,50,60".getBytes()); + connectionOne.write("1,2,9,10".getBytes()); + connectionOne.commitTransaction(); + connectionTwo.commitTransaction(); + + Set partitionsOne = new HashSet<>(connectionOne.getPartitions()); + Assert.assertEquals(4, partitionsOne.size()); + + Set partitionsTwo = new HashSet<>(connectionTwo.getPartitions()); + Assert.assertEquals(2, partitionsTwo.size()); + + connectionOne.close(); + connectionTwo.close(); + + try { + String partitionName = partitionsOne.iterator().next(); + msClient.getPartition("default", "writeiddynamic", partitionName); + Assert.fail( + "Partition shouldn't exist so a NoSuchObjectException should have been raised"); + } catch (NoSuchObjectException e) { + } + + partitionsOne.addAll(partitionsTwo); + Set allPartitions = partitionsOne; + transactionConnection.commitTransactionWithPartition(allPartitions); + + // Ensure partition is present + for (String partition : allPartitions) { + Partition p = + msClient.getPartition("default", "writeiddynamic", + partition); + Assert.assertNotNull("Did not find added partition", p); + } + } + @Test public void testTransactionBatchEmptyCommit() throws Exception { // 1) to partitioned table @@ -977,8 +1239,8 @@ public void testTransactionBatchEmptyCommit() throws Exception { .connect(); connection.beginTransaction(); connection.commitTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.COMMITTED, + connection.getCurrentTransactionState()); connection.close(); // 2) To unpartitioned table @@ -995,8 +1257,8 @@ public void testTransactionBatchEmptyCommit() throws Exception { connection.beginTransaction(); connection.commitTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.COMMITTED, + connection.getCurrentTransactionState()); connection.close(); } @@ -1115,8 +1377,8 @@ public void testTransactionBatchEmptyAbort() throws Exception { connection.beginTransaction(); connection.abortTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.ABORTED, + connection.getCurrentTransactionState()); connection.close(); // 2) to unpartitioned table @@ -1133,8 +1395,8 @@ public void testTransactionBatchEmptyAbort() throws Exception { connection.beginTransaction(); connection.abortTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.ABORTED, + connection.getCurrentTransactionState()); connection.close(); } @@ -1156,20 +1418,20 @@ public void testTransactionBatchCommitDelimited() throws Exception { // 1st Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write("1,Hello streaming".getBytes()); connection.commitTransaction(); checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.COMMITTED, + connection.getCurrentTransactionState()); // 2nd Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write("2,Welcome to streaming".getBytes()); // data should not be visible @@ -1182,8 +1444,8 @@ public void testTransactionBatchCommitDelimited() throws Exception { connection.close(); - Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.INACTIVE, + connection.getCurrentTransactionState()); // To Unpartitioned table @@ -1199,13 +1461,13 @@ public void testTransactionBatchCommitDelimited() throws Exception { .connect(); // 1st Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write("1,Hello streaming".getBytes()); connection.commitTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.COMMITTED, + connection.getCurrentTransactionState()); connection.close(); } @@ -1227,20 +1489,20 @@ public void testTransactionBatchCommitRegex() throws Exception { // 1st Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write("1,Hello streaming".getBytes()); connection.commitTransaction(); checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.COMMITTED, + connection.getCurrentTransactionState()); // 2nd Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write("2,Welcome to streaming".getBytes()); // data should not be visible @@ -1252,8 +1514,8 @@ public void testTransactionBatchCommitRegex() throws Exception { "{2, Welcome to streaming}"); connection.close(); - Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.INACTIVE, + connection.getCurrentTransactionState()); // To Unpartitioned table regex = "([^:]*):(.*)"; @@ -1271,13 +1533,13 @@ public void testTransactionBatchCommitRegex() throws Exception { // 1st Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write("1:Hello streaming".getBytes()); connection.commitTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.COMMITTED, + connection.getCurrentTransactionState()); connection.close(); } @@ -1328,20 +1590,20 @@ public void testTransactionBatchCommitJson() throws Exception { // 1st Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.OPEN, + connection.getCurrentTransactionState()); String rec1 = "{\"id\" : 1, \"msg\": \"Hello streaming\"}"; connection.write(rec1.getBytes()); connection.commitTransaction(); checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.COMMITTED, + connection.getCurrentTransactionState()); connection.close(); - Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.INACTIVE, + connection.getCurrentTransactionState()); List rs = queryTable(driver, "select * from " + dbName + "." + tblName); Assert.assertEquals(1, rs.size()); @@ -1363,7 +1625,7 @@ public void testJsonInputStream() throws Exception { // 1st Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.OPEN, connection.getCurrentTransactionState()); String records = "{\"id\" : 1, \"msg\": \"Hello streaming\"}|{\"id\" : 2, \"msg\": \"Hello world\"}|{\"id\" : 3, " + "\"msg\": \"Hello world!!\"}"; ByteArrayInputStream bais = new ByteArrayInputStream(records.getBytes()); @@ -1399,20 +1661,20 @@ public void testRemainingTransactions() throws Exception { connection.beginTransaction(); Assert.assertEquals(--initialCount, connection.remainingTransactions()); for (int rec = 0; rec < 2; ++rec) { - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write((batch * rec + ",Hello streaming").getBytes()); } connection.commitTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.COMMITTED, + connection.getCurrentTransactionState()); ++batch; } Assert.assertEquals(0, connection.remainingTransactions()); connection.close(); - Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.INACTIVE, + connection.getCurrentTransactionState()); connection = HiveStreamingConnection.newBuilder() .withDatabase(dbName) @@ -1430,20 +1692,20 @@ public void testRemainingTransactions() throws Exception { connection.beginTransaction(); Assert.assertEquals(--initialCount, connection.remainingTransactions()); for (int rec = 0; rec < 2; ++rec) { - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write((batch * rec + ",Hello streaming").getBytes()); } connection.abortTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.ABORTED, + connection.getCurrentTransactionState()); ++batch; } Assert.assertEquals(0, connection.remainingTransactions()); connection.close(); - Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.INACTIVE, + connection.getCurrentTransactionState()); } @Test @@ -1468,8 +1730,8 @@ public void testTransactionBatchAbort() throws Exception { checkNothingWritten(partLoc); - Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.ABORTED, + connection.getCurrentTransactionState()); connection.close(); @@ -1507,8 +1769,8 @@ public void testTransactionBatchAbortAndCommit() throws Exception { checkNothingWritten(partLoc); - Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.ABORTED, + connection.getCurrentTransactionState()); connection.beginTransaction(); connection.write("1,Hello streaming".getBytes()); @@ -1577,8 +1839,8 @@ public void testMultipleTransactionBatchCommits() throws Exception { "2\tWelcome to streaming", "3\tHello streaming - once again", "4\tWelcome to streaming - once again"); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.COMMITTED, + connection.getCurrentTransactionState()); connection.close(); } @@ -1690,10 +1952,10 @@ public void testInterleavedTransactionBatchCommits() throws Exception { "3\tHello streaming - once again", "4\tWelcome to streaming - once again"); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection2.getCurrentTransactionState()); + Assert.assertEquals(TxnState.COMMITTED, + connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.COMMITTED, + connection2.getCurrentTransactionState()); connection.close(); connection2.close(); @@ -2511,7 +2773,8 @@ private void recordOffsets(final HiveConf conf, final String dbLocation, } @Test - public void testErrorHandling() throws Exception { + public void testErrorHandling() + throws Exception { String agentInfo = "UT_" + Thread.currentThread().getName(); runCmdOnDriver("create database testErrors"); runCmdOnDriver("use testErrors"); @@ -2538,8 +2801,12 @@ public void testErrorHandling() throws Exception { GetOpenTxnsInfoResponse r = msClient.showTxns(); Assert.assertEquals("HWM didn'table match", 17, r.getTxn_high_water_mark()); List ti = r.getOpen_txns(); - Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState()); - Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState()); + Assert.assertEquals("wrong status ti(0)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(0).getState()); + Assert.assertEquals("wrong status ti(1)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(1).getState()); try { @@ -2621,10 +2888,16 @@ public void testErrorHandling() throws Exception { r = msClient.showTxns(); Assert.assertEquals("HWM didn't match", 19, r.getTxn_high_water_mark()); ti = r.getOpen_txns(); - Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState()); - Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState()); + Assert.assertEquals("wrong status ti(0)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(0).getState()); + Assert.assertEquals("wrong status ti(1)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(1).getState()); //txnid 3 was committed and thus not open - Assert.assertEquals("wrong status ti(2)", TxnState.ABORTED, ti.get(2).getState()); + Assert.assertEquals("wrong status ti(2)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(2).getState()); connection.close(); writer.disableErrors(); @@ -2651,8 +2924,12 @@ public void testErrorHandling() throws Exception { r = msClient.showTxns(); Assert.assertEquals("HWM didn'table match", 21, r.getTxn_high_water_mark()); ti = r.getOpen_txns(); - Assert.assertEquals("wrong status ti(3)", TxnState.ABORTED, ti.get(3).getState()); - Assert.assertEquals("wrong status ti(4)", TxnState.ABORTED, ti.get(4).getState()); + Assert.assertEquals("wrong status ti(3)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(3).getState()); + Assert.assertEquals("wrong status ti(4)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(4).getState()); } // assumes un partitioned table @@ -2953,5 +3230,17 @@ void enableErrors() { void disableErrors() { shouldThrow = false; } + + @Override + public Path getDeltaFileLocation(List partitionValues, + Integer bucketId, Long minWriteId, Long maxWriteId, Integer statementId, + Table table) throws StreamingException { + return null; + } + + @Override public Set getUndeterminedPartitions() { + return null; + } } + } diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java b/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java index 32a6d06c2b..d9b78ffea8 100644 --- a/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java +++ b/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hive.streaming.transaction.TxnState; import org.apache.thrift.TException; import org.junit.After; import org.junit.Assert; @@ -631,7 +632,7 @@ public void testJsonInputStreamDP() throws Exception { // 1st Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.OPEN, connection.getCurrentTransactionState()); String records = "{\"id\" : 1, \"msg\": \"Hello streaming\", \"continent\": \"Asia\", \"Country\": \"India\"}|" + "{\"id\" : 2, \"msg\": \"Hello world\", \"continent\": \"Europe\", \"Country\": \"Germany\"}|" + "{\"id\" : 3, \"msg\": \"Hello world!!\", \"continent\": \"Asia\", \"Country\": \"China\"}|" +