diff --git streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java index 9e90d36dae..88a7d82a04 100644 --- streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java +++ streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -46,9 +46,11 @@ import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.serde2.AbstractSerDe; @@ -66,6 +68,7 @@ private static final Logger LOG = LoggerFactory.getLogger(AbstractRecordWriter.class.getName()); private static final String DEFAULT_LINE_DELIMITER_PATTERN = "[\r\n]"; + private Integer statementId; protected HiveConf conf; protected StreamingConnection conn; protected Table table; @@ -128,13 +131,21 @@ public void memoryUsageAboveThreshold(final long usedMemory, final long maxMemor } @Override - public void init(StreamingConnection conn, long minWriteId, long maxWriteId) throws StreamingException { + public void init(StreamingConnection conn, long minWriteId, long maxWriteId) + throws StreamingException { + init(conn, minWriteId, maxWriteId, -1); + } + + @Override + public void init(StreamingConnection conn, long minWriteId, long maxWriteId, + int statementId) throws StreamingException { if (conn == null) { throw new StreamingException("Streaming connection cannot be null during record writer initialization"); } this.conn = conn; this.curBatchMinWriteId = minWriteId; this.curBatchMaxWriteId = maxWriteId; + this.statementId = statementId; this.conf = conn.getHiveConf(); this.defaultPartitionName = conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME); this.table = conn.getTable(); @@ -431,6 +442,7 @@ public void write(final long writeId, final byte[] record) throws StreamingExcep int bucket = getBucket(encodedRow); List partitionValues = getPartitionValues(encodedRow); getRecordUpdater(partitionValues, bucket).insert(writeId, encodedRow); + // ingest size bytes gets resetted on flush() whereas connection stats is not conn.getConnectionStats().incrementRecordsWritten(); conn.getConnectionStats().incrementRecordsSize(record.length); @@ -492,10 +504,53 @@ protected RecordUpdater createRecordUpdater(final Path partitionPath, int bucket .tableProperties(tblProperties) .minimumWriteId(minWriteId) .maximumWriteId(maxWriteID) - .statementId(-1) + .statementId(statementId) .finalDestination(partitionPath)); } + /** + * Returns the file that would be used to store rows under this. + * parameters + * @param partitionValues partition values + * @param bucketId bucket id + * @param minWriteId min write Id + * @param maxWriteId max write Id + * @param statementId statement Id + * @param table table + * @return the location of the file. + * @throws StreamingException when the path is not found + */ + @Override + public Path getDeltaFileLocation(List partitionValues, + Integer bucketId, Long minWriteId, Long maxWriteId, Integer statementId, + Table table) throws StreamingException { + Path destLocation; + if (partitionValues == null) { + destLocation = new Path(table.getSd().getLocation()); + } else { + Map partSpec = Warehouse.makeSpecFromValues( + table.getPartitionKeys(), partitionValues); + try { + destLocation = new Path(table.getDataLocation(), Warehouse.makePartPath(partSpec)); + } catch (MetaException e) { + throw new StreamingException("Unable to retrieve the delta file location" + + " for values: " + partitionValues + + ", minWriteId: " + minWriteId + + ", maxWriteId: " + maxWriteId + + ", statementId: " + statementId, e); + } + } + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) + .filesystem(fs) + .inspector(outputRowObjectInspector) + .bucket(bucketId) + .minimumWriteId(minWriteId) + .maximumWriteId(maxWriteId) + .statementId(statementId) + .finalDestination(destLocation); + return AcidUtils.createFilename(destLocation, options); + } + protected RecordUpdater getRecordUpdater(List partitionValues, int bucketId) throws StreamingIOFailure { RecordUpdater recordUpdater; String key; @@ -516,12 +571,10 @@ protected RecordUpdater getRecordUpdater(List partitionValues, int bucke // partitions to TxnHandler if (!partitionInfo.isExists()) { addedPartitions.add(partitionInfo.getName()); - if (LOG.isDebugEnabled()) { - LOG.debug("Created partition {} for table {}", partitionInfo.getName(), fullyQualifiedTableName); - } } else { if (LOG.isDebugEnabled()) { - LOG.debug("Partition {} already exists for table {}", partitionInfo.getName(), fullyQualifiedTableName); + LOG.debug("Partition {} already exists for table {}", + partitionInfo.getName(), fullyQualifiedTableName); } } destLocation = new Path(partitionInfo.getPartitionLocation()); diff --git streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java new file mode 100644 index 0000000000..a99fdba2c1 --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +import org.apache.hadoop.hive.metastore.api.TxnToWriteId; + +import java.io.InputStream; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Common methods for the implementing classes. + */ +abstract class AbstractStreamingTransaction + implements StreamingTransaction { + + /** + * This variable should be initialized by the children. + */ + protected RecordWriter recordWriter; + + /** + * This variable should be initialized by the children. + */ + protected List txnToWriteIds; + + + /** + * once any operation on this batch encounters a system exception + * (e.g. IOException on write) it's safest to assume that we can't write to the + * file backing this batch any more. This guards important public methods + */ + protected final AtomicBoolean isTxnClosed = new AtomicBoolean(false); + + protected int currentTxnIndex = -1; + protected HiveStreamingConnection.TxnState state; + + + protected void checkIsClosed() throws StreamingException { + if (isTxnClosed.get()) { + throw new StreamingException("Transaction" + toString() + " is closed()"); + } + } + + protected void beginNextTransactionImpl(String errorMessage) + throws StreamingException{ + state = HiveStreamingConnection.TxnState.INACTIVE; //clear state from previous txn + if ((currentTxnIndex + 1) >= txnToWriteIds.size()) { + throw new InvalidTransactionState(errorMessage); + } + currentTxnIndex++; + state = HiveStreamingConnection.TxnState.OPEN; + } + + public void write(final byte[] record) throws StreamingException { + checkIsClosed(); + boolean success = false; + try { + recordWriter.write(getCurrentWriteId(), record); + success = true; + } catch (SerializationError ex) { + //this exception indicates that a {@code record} could not be parsed and the + //caller can decide whether to drop it or send it to dead letter queue. + //rolling back the txn and retrying won't help since the tuple will be exactly the same + //when it's replayed. + success = true; + throw ex; + } finally { + markDead(success); + } + } + + public void write(final InputStream inputStream) throws StreamingException { + checkIsClosed(); + boolean success = false; + try { + recordWriter.write(getCurrentWriteId(), inputStream); + success = true; + } catch (SerializationError ex) { + //this exception indicates that a {@code record} could not be parsed and the + //caller can decide whether to drop it or send it to dead letter queue. + //rolling back the txn and retrying won'table help since the tuple will be exactly the same + //when it's replayed. + success = true; + throw ex; + } finally { + markDead(success); + } + } + + /** + * A transaction batch opens a single HDFS file and writes multiple transaction to it. If there is any issue + * with the write, we can't continue to write to the same file any as it may be corrupted now (at the tail). + * This ensures that a client can't ignore these failures and continue to write. + */ + protected void markDead(boolean success) throws StreamingException { + if (success) { + return; + } + close(); + } + + public long getCurrentWriteId() { + if (currentTxnIndex >= 0) { + return txnToWriteIds.get(currentTxnIndex).getWriteId(); + } + return -1L; + } + + public int remainingTransactions() { + if (currentTxnIndex >= 0) { + return txnToWriteIds.size() - currentTxnIndex - 1; + } + return txnToWriteIds.size(); + } + + public boolean isClosed() { + return isTxnClosed.get(); + } + + public HiveStreamingConnection.TxnState getCurrentTransactionState() { + return state; + } + + public long getCurrentTxnId() { + if (currentTxnIndex >= 0) { + return txnToWriteIds.get(currentTxnIndex).getTxnId(); + } + return -1L; + } + + @Override + public List getTxnToWriteIds() { + return txnToWriteIds; + } + + public void commit() throws StreamingException { + commitWithPartitions(null); + } +} diff --git streaming/src/java/org/apache/hive/streaming/ConnectionStats.java streaming/src/java/org/apache/hive/streaming/ConnectionStats.java index 355456eb1e..c0e7d5c296 100644 --- streaming/src/java/org/apache/hive/streaming/ConnectionStats.java +++ streaming/src/java/org/apache/hive/streaming/ConnectionStats.java @@ -31,6 +31,16 @@ private LongAdder autoFlushCount = new LongAdder(); private LongAdder metastoreCalls = new LongAdder(); + /** + * Total partitions that have been affected. + */ + private LongAdder totalPartitions = new LongAdder(); + + /** + * Number of partitions that where created. + */ + private LongAdder createdPartitions = new LongAdder(); + public void incrementRecordsWritten() { recordsWritten.increment(); } @@ -55,6 +65,22 @@ public void incrementRecordsSize(long delta) { recordsSize.add(delta); } + /** + * Increment by delta the number of created partitions. + * @param delta to increment by. + */ + public void incrementCreatedPartitions(long delta) { + createdPartitions.add(delta); + } + + /** + * Increment by delta the total partitions. + * @param delta to increment by. + */ + public void incrementTotalPartitions(long delta) { + totalPartitions.add(delta); + } + public long getRecordsWritten() { return recordsWritten.longValue(); } @@ -79,10 +105,20 @@ public long getMetastoreCalls() { return metastoreCalls.longValue(); } + public LongAdder getTotalPartitions() { + return totalPartitions; + } + + public LongAdder getCreatedPartitions() { + return createdPartitions; + } + @Override public String toString() { return "{records-written: " + recordsWritten + ", records-size: "+ recordsSize + ", committed-transactions: " + committedTransactions + ", aborted-transactions: " + abortedTransactions + ", auto-flushes: " + autoFlushCount + - ", metastore-calls: " + metastoreCalls + " }"; + ", metastore-calls: " + metastoreCalls + + ", created-partitions: " + createdPartitions + + ", total-partitions: " + totalPartitions + " }"; } } diff --git streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index 6cf14b064f..f79b844826 100644 --- streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -23,43 +23,27 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.LockComponentBuilder; -import org.apache.hadoop.hive.metastore.LockRequestBuilder; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; -import org.apache.hadoop.hive.metastore.api.DataOperationType; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; -import org.apache.hadoop.hive.metastore.api.LockRequest; -import org.apache.hadoop.hive.metastore.api.LockResponse; -import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; -import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; @@ -71,7 +55,6 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Streaming connection implementation for hive. To create a streaming connection, use the builder API @@ -120,11 +103,11 @@ private static final String DEFAULT_METASTORE_URI = "thrift://localhost:9083"; private static final int DEFAULT_TRANSACTION_BATCH_SIZE = 1; - private static final int DEFAULT_HEARTBEAT_INTERVAL = 60 * 1000; private static final boolean DEFAULT_STREAMING_OPTIMIZATIONS_ENABLED = true; public enum TxnState { - INACTIVE("I"), OPEN("O"), COMMITTED("C"), ABORTED("A"); + INACTIVE("I"), OPEN("O"), COMMITTED("C"), ABORTED("A"), + PREPARED_FOR_COMMIT("P"); private final String code; @@ -144,7 +127,7 @@ public String toString() { private String agentInfo; private int transactionBatchSize; private RecordWriter recordWriter; - private TransactionBatch currentTransactionBatch; + private StreamingTransaction currentTransactionBatch; private HiveConf conf; private boolean streamingOptimizations; private AtomicBoolean isConnectionClosed = new AtomicBoolean(false); @@ -158,6 +141,11 @@ public String toString() { private Table tableObject = null; private String metastoreUri; private ConnectionStats connectionStats; + private final Long writeId; + private final Integer statementId; + private boolean manageTransactions; + private int countTransactions = 0; + private Set partitions; private HiveStreamingConnection(Builder builder) throws StreamingException { this.database = builder.database.toLowerCase(); @@ -166,6 +154,12 @@ private HiveStreamingConnection(Builder builder) throws StreamingException { this.conf = builder.hiveConf; this.agentInfo = builder.agentInfo; this.streamingOptimizations = builder.streamingOptimizations; + this.writeId = builder.writeId; + this.statementId = builder.statementId; + this.tableObject = builder.tableObject; + this.setPartitionedTable(builder.isPartitioned); + this.manageTransactions = builder.manageTransactions; + UserGroupInformation loggedInUser = null; try { loggedInUser = UserGroupInformation.getLoginUser(); @@ -193,13 +187,18 @@ private HiveStreamingConnection(Builder builder) throws StreamingException { if (conf == null) { conf = createHiveConf(this.getClass(), DEFAULT_METASTORE_URI); } + overrideConfSettings(conf); - this.metastoreUri = conf.get(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName()); - this.msClient = getMetaStoreClient(conf, metastoreUri, secureMode, "streaming-connection"); - // We use a separate metastore client for heartbeat calls to ensure heartbeat RPC calls are - // isolated from the other transaction related RPC calls. - this.heartbeatMSClient = getMetaStoreClient(conf, metastoreUri, secureMode, "streaming-connection-heartbeat"); - validateTable(); + if (manageTransactions) { + this.metastoreUri = conf.get(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName()); + this.msClient = getMetaStoreClient(conf, metastoreUri, secureMode, + "streaming-connection"); + // We use a separate metastore client for heartbeat calls to ensure heartbeat RPC calls are + // isolated from the other transaction related RPC calls. + this.heartbeatMSClient = getMetaStoreClient(conf, metastoreUri, secureMode, + "streaming-connection-heartbeat"); + validateTable(); + } LOG.info("STREAMING CONNECTION INFO: {}", toConnectionInfoString()); } @@ -217,6 +216,11 @@ public static Builder newBuilder() { private int transactionBatchSize = DEFAULT_TRANSACTION_BATCH_SIZE; private boolean streamingOptimizations = DEFAULT_STREAMING_OPTIMIZATIONS_ENABLED; private RecordWriter recordWriter; + private long writeId = -1; + private int statementId = -1; + private boolean manageTransactions = true; + private Table tableObject; + private boolean isPartitioned; /** * Specify database to use for streaming connection. @@ -314,6 +318,44 @@ public Builder withRecordWriter(final RecordWriter recordWriter) { return this; } + /** + * Specify this parameter if we want the current connection + * to join an ongoing transaction without having to query + * the metastore to create it. + * @param writeId write id + * @return builder + */ + public Builder withWriteId(final long writeId) { + this.writeId = writeId; + manageTransactions = false; + return this; + } + + /** + * Specify this parameter to set an statement id in the writer. + * This really only makes sense to be specified when a writeId is + * provided as well + * @param statementId statement id + * @return builder + */ + public Builder withStatementId(final int statementId) { + this.statementId = statementId; + return this; + } + + /** + * Specify the table object since sometimes no connections + * to the metastore will be opened. + * @param table table object. + * @return builder + */ + public Builder withTableObject(Table table) { + this.tableObject = table; + this.isPartitioned = tableObject.getPartitionKeys() != null + && !tableObject.getPartitionKeys().isEmpty(); + return this; + } + /** * Returning a streaming connection to hive. * @@ -324,11 +366,27 @@ public HiveStreamingConnection connect() throws StreamingException { throw new StreamingException("Database cannot be null for streaming connection"); } if (table == null) { - throw new StreamingException("Table cannot be null for streaming connection"); + if (tableObject == null) { + throw new StreamingException("Table and table object cannot be " + + "null for streaming connection"); + } else { + table = tableObject.getTableName(); + } + } + + if (tableObject != null && !tableObject.getTableName().equals(table)) { + throw new StreamingException("Table must match tableObject table name"); } + if (recordWriter == null) { throw new StreamingException("Record writer cannot be null for streaming connection"); } + if ((writeId != -1 && tableObject == null) || + (writeId == -1 && tableObject != null)){ + throw new StreamingException("If writeId is set, tableObject " + + "must be set as well and vice versa"); + } + HiveStreamingConnection streamingConnection = new HiveStreamingConnection(this); // assigning higher priority than FileSystem shutdown hook so that streaming connection gets closed first before // filesystem close (to avoid ClosedChannelException) @@ -338,7 +396,7 @@ public HiveStreamingConnection connect() throws StreamingException { } } - private void setPartitionedTable(boolean isPartitionedTable) { + private void setPartitionedTable(Boolean isPartitionedTable) { this.isPartitionedTable = isPartitionedTable; } @@ -356,7 +414,9 @@ private String toConnectionInfoString() { "username: " + username + ", " + "secure-mode: " + secureMode + ", " + "record-writer: " + recordWriter.getClass().getSimpleName() + ", " + - "agent-info: " + agentInfo + " }"; + "agent-info: " + agentInfo + ", " + + "writeId: " + writeId + ", " + + "statementId: " + statementId + " }"; } @VisibleForTesting @@ -369,6 +429,7 @@ public PartitionInfo createPartitionIfNotExists(final List partitionValu String partLocation = null; String partName = null; boolean exists = false; + try { Map partSpec = Warehouse.makeSpecFromValues(tableObject.getPartitionKeys(), partitionValues); AddPartitionDesc addPartitionDesc = new AddPartitionDesc(database, table, true); @@ -376,7 +437,18 @@ public PartitionInfo createPartitionIfNotExists(final List partitionValu partLocation = new Path(tableObject.getDataLocation(), Warehouse.makePartPath(partSpec)).toString(); addPartitionDesc.addPartition(partSpec, partLocation); Partition partition = Hive.convertAddSpecToMetaPartition(tableObject, addPartitionDesc.getPartition(0), conf); + + if (getMSC() == null) { + // We assume it doesn't exist if we can't check it + // so the driver will decide + return new PartitionInfo(partName, partLocation, false); + } + getMSC().add_partition(partition); + if (LOG.isDebugEnabled()) { + LOG.debug("Created partition {} for table {}", partName, + tableObject.getFullyQualifiedName()); + } } catch (AlreadyExistsException e) { exists = true; } catch (HiveException | TException e) { @@ -386,6 +458,25 @@ public PartitionInfo createPartitionIfNotExists(final List partitionValu return new PartitionInfo(partName, partLocation, exists); } + /** + * Returns the file that would be used to store rows under this. + * parameters + * @param partitionValues partition values + * @param bucketId bucket id + * @param minWriteId min write Id + * @param maxWriteId max write Id + * @param statementId statement Id + * @return the location of the file. + * @throws StreamingException when the path is not found + */ + @Override + public Path getDeltaFileLocation(List partitionValues, + Integer bucketId, Long minWriteId, Long maxWriteId, Integer statementId) + throws StreamingException { + return recordWriter.getDeltaFileLocation(partitionValues, + bucketId, minWriteId, maxWriteId, statementId, tableObject); + } + IMetaStoreClient getMSC() { connectionStats.incrementMetastoreCalls(); return msClient; @@ -424,43 +515,6 @@ private void validateTable() throws InvalidTable, ConnectionError { } } - private static class HeartbeatRunnable implements Runnable { - private final HiveStreamingConnection conn; - private final AtomicLong minTxnId; - private final long maxTxnId; - private final ReentrantLock transactionLock; - private final AtomicBoolean isTxnClosed; - - HeartbeatRunnable(final HiveStreamingConnection conn, final AtomicLong minTxnId, final long maxTxnId, - final ReentrantLock transactionLock, final AtomicBoolean isTxnClosed) { - this.conn = conn; - this.minTxnId = minTxnId; - this.maxTxnId = maxTxnId; - this.transactionLock = transactionLock; - this.isTxnClosed = isTxnClosed; - } - - @Override - public void run() { - transactionLock.lock(); - try { - if (minTxnId.get() > 0) { - HeartbeatTxnRangeResponse resp = conn.getHeatbeatMSC().heartbeatTxnRange(minTxnId.get(), maxTxnId); - if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) { - LOG.error("Heartbeat failure: {}", resp.toString()); - isTxnClosed.set(true); - } else { - LOG.info("Heartbeat sent for range: [{}-{}]", minTxnId.get(), maxTxnId); - } - } - } catch (TException e) { - LOG.warn("Failure to heartbeat for transaction range: [" + minTxnId.get() + "-" + maxTxnId + "]", e); - } finally { - transactionLock.unlock(); - } - } - } - private void beginNextTransaction() throws StreamingException { if (currentTransactionBatch == null) { currentTransactionBatch = createNewTransactionBatch(); @@ -481,8 +535,18 @@ private void beginNextTransaction() throws StreamingException { currentTransactionBatch.beginNextTransaction(); } - private TransactionBatch createNewTransactionBatch() throws StreamingException { - return new TransactionBatch(this); + private StreamingTransaction createNewTransactionBatch() throws StreamingException { + countTransactions++; + if (manageTransactions) { + return new TransactionBatch(this); + } else { + if (countTransactions > 1) { + throw new StreamingException("If a writeId is passed for the " + + "construction of HiveStreaming only one transaction batch" + + " can be done"); + } + return new UnManagedSingleTransaction(this); + } } private void checkClosedState() throws StreamingException { @@ -496,7 +560,7 @@ private void checkState() throws StreamingException { if (currentTransactionBatch == null) { throw new StreamingException("Transaction batch is null. Missing beginTransaction?"); } - if (currentTransactionBatch.state != TxnState.OPEN) { + if (currentTransactionBatch.getCurrentTransactionState() != TxnState.OPEN) { throw new StreamingException("Transaction state is not OPEN. Missing beginTransaction?"); } } @@ -504,13 +568,40 @@ private void checkState() throws StreamingException { @Override public void beginTransaction() throws StreamingException { checkClosedState(); + partitions = new HashSet<>(); beginNextTransaction(); } @Override public void commitTransaction() throws StreamingException { + commitTransactionWithPartition(null); + } + + @Override + public void commitTransactionWithPartition(Set partitions) + throws StreamingException { checkState(); - currentTransactionBatch.commit(); + + Set createdPartitions = new HashSet<>(); + if (partitions != null) { + for (String partition: partitions) { + try { + PartitionInfo info = createPartitionIfNotExists( + Warehouse.getPartValuesFromPartName(partition)); + if (!info.isExists()) { + createdPartitions.add(partition); + } + } catch (MetaException e) { + throw new StreamingException("Partition " + partition + " is invalid.", e); + } + } + connectionStats.incrementTotalPartitions(partitions.size()); + } + + currentTransactionBatch.commitWithPartitions(createdPartitions); + this.partitions.addAll( + currentTransactionBatch.getPartitions()); + connectionStats.incrementCreatedPartitions(createdPartitions.size()); connectionStats.incrementCommittedTransactions(); } @@ -549,8 +640,10 @@ public void close() { } catch (StreamingException e) { LOG.warn("Unable to close current transaction batch: " + currentTransactionBatch, e); } finally { - getMSC().close(); - getHeatbeatMSC().close(); + if (manageTransactions) { + getMSC().close(); + getHeatbeatMSC().close(); + } } if (LOG.isInfoEnabled()) { LOG.info("Closed streaming connection. Agent: {} Stats: {}", getAgentInfo(), getConnectionStats()); @@ -595,418 +688,6 @@ Long getCurrentTxnId() { return currentTransactionBatch.getCurrentTxnId(); } - private static class TransactionBatch { - private String username; - private HiveStreamingConnection conn; - private ScheduledExecutorService scheduledExecutorService; - private RecordWriter recordWriter; - private String partNameForLock = null; - private List txnToWriteIds; - private int currentTxnIndex = -1; - private TxnState state; - private LockRequest lockRequest = null; - // heartbeats can only be sent for open transactions. - // there is a race between committing/aborting a transaction and heartbeat. - // Example: If a heartbeat is sent for committed txn, exception will be thrown. - // Similarly if we don't send a heartbeat, metastore server might abort a txn - // for missed heartbeat right before commit txn call. - // This lock is used to mutex commit/abort and heartbeat calls - private final ReentrantLock transactionLock = new ReentrantLock(); - // min txn id is incremented linearly within a transaction batch. - // keeping minTxnId atomic as it is shared with heartbeat thread - private final AtomicLong minTxnId; - // max txn id does not change for a transaction batch - private final long maxTxnId; - - /** - * once any operation on this batch encounters a system exception - * (e.g. IOException on write) it's safest to assume that we can't write to the - * file backing this batch any more. This guards important public methods - */ - private final AtomicBoolean isTxnClosed = new AtomicBoolean(false); - private String agentInfo; - private int numTxns; - /** - * Tracks the state of each transaction - */ - private TxnState[] txnStatus; - /** - * ID of the last txn used by {@link #beginNextTransactionImpl()} - */ - private long lastTxnUsed; - - /** - * Represents a batch of transactions acquired from MetaStore - * - * @param conn - hive streaming connection - * @throws StreamingException if failed to create new RecordUpdater for batch - */ - private TransactionBatch(HiveStreamingConnection conn) throws StreamingException { - boolean success = false; - try { - if (conn.isPartitionedTable() && !conn.isDynamicPartitioning()) { - List partKeys = conn.tableObject.getPartitionKeys(); - partNameForLock = Warehouse.makePartName(partKeys, conn.staticPartitionValues); - } - this.conn = conn; - this.username = conn.username; - this.recordWriter = conn.recordWriter; - this.agentInfo = conn.agentInfo; - this.numTxns = conn.transactionBatchSize; - - setupHeartBeatThread(); - - List txnIds = openTxnImpl(username, numTxns); - txnToWriteIds = allocateWriteIdsImpl(txnIds); - assert (txnToWriteIds.size() == numTxns); - - txnStatus = new TxnState[numTxns]; - for (int i = 0; i < txnStatus.length; i++) { - assert (txnToWriteIds.get(i).getTxnId() == txnIds.get(i)); - txnStatus[i] = TxnState.OPEN; //Open matches Metastore state - } - this.state = TxnState.INACTIVE; - - // initialize record writer with connection and write id info - recordWriter.init(conn, txnToWriteIds.get(0).getWriteId(), txnToWriteIds.get(numTxns - 1).getWriteId()); - this.minTxnId = new AtomicLong(txnIds.get(0)); - this.maxTxnId = txnIds.get(txnIds.size() - 1); - success = true; - } catch (TException e) { - throw new StreamingException(conn.toString(), e); - } finally { - //clean up if above throws - markDead(success); - } - } - - private void setupHeartBeatThread() { - // start heartbeat thread - ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("HiveStreamingConnection-Heartbeat-Thread") - .build(); - this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory); - long heartBeatInterval; - long initialDelay; - try { - // if HIVE_TXN_TIMEOUT is defined, heartbeat interval will be HIVE_TXN_TIMEOUT/2 - heartBeatInterval = DbTxnManager.getHeartbeatInterval(conn.conf); - } catch (LockException e) { - heartBeatInterval = DEFAULT_HEARTBEAT_INTERVAL; - } - // to introduce some randomness and to avoid hammering the metastore at the same time (same logic as DbTxnManager) - initialDelay = (long) (heartBeatInterval * 0.75 * Math.random()); - LOG.info("Starting heartbeat thread with interval: {} ms initialDelay: {} ms for agentInfo: {}", - heartBeatInterval, initialDelay, conn.agentInfo); - Runnable runnable = new HeartbeatRunnable(conn, minTxnId, maxTxnId, transactionLock, isTxnClosed); - this.scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, heartBeatInterval, TimeUnit - .MILLISECONDS); - } - - private List openTxnImpl(final String user, final int numTxns) throws TException { - return conn.getMSC().openTxns(user, numTxns).getTxn_ids(); - } - - private List allocateWriteIdsImpl(final List txnIds) throws TException { - return conn.getMSC().allocateTableWriteIdsBatch(txnIds, conn.database, conn.table); - } - - @Override - public String toString() { - if (txnToWriteIds == null || txnToWriteIds.isEmpty()) { - return "{}"; - } - StringBuilder sb = new StringBuilder(" TxnStatus["); - for (TxnState state : txnStatus) { - //'state' should not be null - future proofing - sb.append(state == null ? "N" : state); - } - sb.append("] LastUsed ").append(JavaUtils.txnIdToString(lastTxnUsed)); - return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getTxnId() - + "/" + txnToWriteIds.get(0).getWriteId() - + "..." - + txnToWriteIds.get(txnToWriteIds.size() - 1).getTxnId() - + "/" + txnToWriteIds.get(txnToWriteIds.size() - 1).getWriteId() - + "] on connection = " + conn + "; " + sb; - } - - private void beginNextTransaction() throws StreamingException { - checkIsClosed(); - beginNextTransactionImpl(); - } - - private void beginNextTransactionImpl() throws TransactionError { - state = TxnState.INACTIVE;//clear state from previous txn - if ((currentTxnIndex + 1) >= txnToWriteIds.size()) { - throw new InvalidTransactionState("No more transactions available in" + - " next batch for connection: " + conn + " user: " + username); - } - currentTxnIndex++; - state = TxnState.OPEN; - lastTxnUsed = getCurrentTxnId(); - lockRequest = createLockRequest(conn, partNameForLock, username, getCurrentTxnId(), agentInfo); - try { - LockResponse res = conn.getMSC().lock(lockRequest); - if (res.getState() != LockState.ACQUIRED) { - throw new TransactionError("Unable to acquire lock on " + conn); - } - } catch (TException e) { - throw new TransactionError("Unable to acquire lock on " + conn, e); - } - } - - long getCurrentTxnId() { - if (currentTxnIndex >= 0) { - return txnToWriteIds.get(currentTxnIndex).getTxnId(); - } - return -1L; - } - - long getCurrentWriteId() { - if (currentTxnIndex >= 0) { - return txnToWriteIds.get(currentTxnIndex).getWriteId(); - } - return -1L; - } - - TxnState getCurrentTransactionState() { - return state; - } - - int remainingTransactions() { - if (currentTxnIndex >= 0) { - return txnToWriteIds.size() - currentTxnIndex - 1; - } - return txnToWriteIds.size(); - } - - - public void write(final byte[] record) throws StreamingException { - checkIsClosed(); - boolean success = false; - try { - recordWriter.write(getCurrentWriteId(), record); - success = true; - } catch (SerializationError ex) { - //this exception indicates that a {@code record} could not be parsed and the - //caller can decide whether to drop it or send it to dead letter queue. - //rolling back the txn and retrying won't help since the tuple will be exactly the same - //when it's replayed. - success = true; - throw ex; - } finally { - markDead(success); - } - } - - public void write(final InputStream inputStream) throws StreamingException { - checkIsClosed(); - boolean success = false; - try { - recordWriter.write(getCurrentWriteId(), inputStream); - success = true; - } catch (SerializationError ex) { - //this exception indicates that a {@code record} could not be parsed and the - //caller can decide whether to drop it or send it to dead letter queue. - //rolling back the txn and retrying won'table help since the tuple will be exactly the same - //when it's replayed. - success = true; - throw ex; - } finally { - markDead(success); - } - } - - private void checkIsClosed() throws StreamingException { - if (isTxnClosed.get()) { - throw new StreamingException("Transaction" + toString() + " is closed()"); - } - } - - /** - * A transaction batch opens a single HDFS file and writes multiple transaction to it. If there is any issue - * with the write, we can't continue to write to the same file any as it may be corrupted now (at the tail). - * This ensures that a client can't ignore these failures and continue to write. - */ - private void markDead(boolean success) throws StreamingException { - if (success) { - return; - } - close(); - } - - - void commit() throws StreamingException { - checkIsClosed(); - boolean success = false; - try { - commitImpl(); - success = true; - } finally { - markDead(success); - } - } - - private void commitImpl() throws StreamingException { - try { - recordWriter.flush(); - TxnToWriteId txnToWriteId = txnToWriteIds.get(currentTxnIndex); - if (conn.isDynamicPartitioning()) { - List partNames = new ArrayList<>(recordWriter.getPartitions()); - conn.getMSC().addDynamicPartitions(txnToWriteId.getTxnId(), txnToWriteId.getWriteId(), conn.database, conn.table, - partNames, DataOperationType.INSERT); - } - transactionLock.lock(); - try { - conn.getMSC().commitTxn(txnToWriteId.getTxnId()); - // increment the min txn id so that heartbeat thread will heartbeat only from the next open transaction. - // the current transaction is going to committed or fail, so don't need heartbeat for current transaction. - if (currentTxnIndex + 1 < txnToWriteIds.size()) { - minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId()); - } else { - // exhausted the batch, no longer have to heartbeat for current txn batch - minTxnId.set(-1); - } - } finally { - transactionLock.unlock(); - } - state = TxnState.COMMITTED; - txnStatus[currentTxnIndex] = TxnState.COMMITTED; - } catch (NoSuchTxnException e) { - throw new TransactionError("Invalid transaction id : " - + getCurrentTxnId(), e); - } catch (TxnAbortedException e) { - throw new TransactionError("Aborted transaction cannot be committed" - , e); - } catch (TException e) { - throw new TransactionError("Unable to commitTransaction transaction" - + getCurrentTxnId(), e); - } - } - - void abort() throws StreamingException { - if (isTxnClosed.get()) { - /* - * isDead is only set internally by this class. {@link #markDead(boolean)} will abort all - * remaining txns, so make this no-op to make sure that a well-behaved client that calls abortTransaction() - * error doesn't get misleading errors - */ - return; - } - abort(false); - } - - private void abort(final boolean abortAllRemaining) throws StreamingException { - abortImpl(abortAllRemaining); - } - - private void abortImpl(boolean abortAllRemaining) throws StreamingException { - if (minTxnId == null) { - return; - } - - transactionLock.lock(); - try { - if (abortAllRemaining) { - // we are aborting all txns in the current batch, so no need to heartbeat - minTxnId.set(-1); - //when last txn finished (abortTransaction/commitTransaction) the currentTxnIndex is pointing at that txn - //so we need to start from next one, if any. Also if batch was created but - //fetchTransactionBatch() was never called, we want to start with first txn - int minOpenTxnIndex = Math.max(currentTxnIndex + - (state == TxnState.ABORTED || state == TxnState.COMMITTED ? 1 : 0), 0); - for (currentTxnIndex = minOpenTxnIndex; - currentTxnIndex < txnToWriteIds.size(); currentTxnIndex++) { - conn.getMSC().rollbackTxn(txnToWriteIds.get(currentTxnIndex).getTxnId()); - txnStatus[currentTxnIndex] = TxnState.ABORTED; - } - currentTxnIndex--;//since the loop left it == txnToWriteIds.size() - } else { - // we are aborting only the current transaction, so move the min range for heartbeat or disable heartbeat - // if the current txn is last in the batch. - if (currentTxnIndex + 1 < txnToWriteIds.size()) { - minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId()); - } else { - // exhausted the batch, no longer have to heartbeat - minTxnId.set(-1); - } - long currTxnId = getCurrentTxnId(); - if (currTxnId > 0) { - conn.getMSC().rollbackTxn(currTxnId); - txnStatus[currentTxnIndex] = TxnState.ABORTED; - } - } - state = TxnState.ABORTED; - } catch (NoSuchTxnException e) { - throw new TransactionError("Unable to abort invalid transaction id : " - + getCurrentTxnId(), e); - } catch (TException e) { - throw new TransactionError("Unable to abort transaction id : " - + getCurrentTxnId(), e); - } finally { - transactionLock.unlock(); - } - } - - public boolean isClosed() { - return isTxnClosed.get(); - } - - /** - * Close the TransactionBatch. This will abort any still open txns in this batch. - * - * @throws StreamingException - failure when closing transaction batch - */ - public void close() throws StreamingException { - if (isTxnClosed.get()) { - return; - } - isTxnClosed.set(true); //also ensures that heartbeat() is no-op since client is likely doing it async - try { - abort(true);//abort all remaining txns - } catch (Exception ex) { - LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex); - throw new StreamingException("Unable to abort", ex); - } - try { - closeImpl(); - } catch (Exception ex) { - LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex); - throw new StreamingException("Unable to close", ex); - } - } - - private void closeImpl() throws StreamingException { - state = TxnState.INACTIVE; - recordWriter.close(); - if (scheduledExecutorService != null) { - scheduledExecutorService.shutdownNow(); - } - } - - static LockRequest createLockRequest(final HiveStreamingConnection connection, - String partNameForLock, String user, long txnId, String agentInfo) { - LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo); - requestBuilder.setUser(user); - requestBuilder.setTransactionId(txnId); - - LockComponentBuilder lockCompBuilder = new LockComponentBuilder() - .setDbName(connection.database) - .setTableName(connection.table) - .setShared() - .setOperationType(DataOperationType.INSERT); - if (connection.isDynamicPartitioning()) { - lockCompBuilder.setIsDynamicPartitionWrite(true); - } - if (partNameForLock != null && !partNameForLock.isEmpty()) { - lockCompBuilder.setPartitionName(partNameForLock); - } - requestBuilder.addLockComponent(lockCompBuilder.build()); - - return requestBuilder.build(); - } - } - private HiveConf createHiveConf(Class clazz, String metaStoreUri) { HiveConf conf = new HiveConf(clazz); if (metaStoreUri != null) { @@ -1049,6 +730,13 @@ private static void setHiveConf(HiveConf conf, String var) { conf.setBoolean(var, true); } + public List getTxnToWriteIds() { + if (currentTransactionBatch != null) { + return currentTransactionBatch.getTxnToWriteIds(); + } + return null; + } + @Override public HiveConf getHiveConf() { return conf; @@ -1083,4 +771,41 @@ public boolean isPartitionedTable() { public boolean isDynamicPartitioning() { return isPartitionedTable() && (staticPartitionValues == null || staticPartitionValues.isEmpty()); } + + @Override + public Set getPartitions() { + return partitions; + } + + public String getUsername() { + return username; + } + + public String getDatabase() { + return database; + } + + public RecordWriter getRecordWriter() { + return recordWriter; + } + + public int getTransactionBatchSize() { + return transactionBatchSize; + } + + public HiveConf getConf() { + return conf; + } + + public Long getWriteId() { + return writeId; + } + + public Integer getStatementId() { + return statementId; + } + + public Long getCurrentWriteId() { + return currentTransactionBatch.getCurrentWriteId(); + } } diff --git streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java index 9d92dfa516..d076ce7961 100644 --- streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java +++ streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java @@ -18,8 +18,11 @@ package org.apache.hive.streaming; +/** + * Invalid transaction. + */ public class InvalidTransactionState extends TransactionError { - InvalidTransactionState(String msg) { + public InvalidTransactionState(String msg) { super(msg); } } diff --git streaming/src/java/org/apache/hive/streaming/PartitionInfo.java streaming/src/java/org/apache/hive/streaming/PartitionInfo.java index ce9f76a6b0..d16a7a5cec 100644 --- streaming/src/java/org/apache/hive/streaming/PartitionInfo.java +++ streaming/src/java/org/apache/hive/streaming/PartitionInfo.java @@ -26,6 +26,7 @@ private String partitionLocation; private boolean exists; + public PartitionInfo(final String name, final String partitionLocation, final boolean exists) { this.name = name; this.partitionLocation = partitionLocation; diff --git streaming/src/java/org/apache/hive/streaming/RecordWriter.java streaming/src/java/org/apache/hive/streaming/RecordWriter.java index d9c4455e61..5b027548a9 100644 --- streaming/src/java/org/apache/hive/streaming/RecordWriter.java +++ streaming/src/java/org/apache/hive/streaming/RecordWriter.java @@ -19,8 +19,12 @@ package org.apache.hive.streaming; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.metadata.Table; + import java.io.InputStream; +import java.util.List; import java.util.Set; public interface RecordWriter { @@ -35,6 +39,20 @@ */ void init(StreamingConnection connection, long minWriteId, long maxWriteID) throws StreamingException; + /** + * Initialize record writer. + * + * @param connection - streaming connection + * @param minWriteId - min write id + * @param maxWriteID - max write id + * @param statementId - statemenId. Note this number can't be bigger than 2^12 + * @throws StreamingException - thrown when initialization failed + */ + default void init(StreamingConnection connection, long minWriteId, + long maxWriteID, int statementId) throws StreamingException { + init(connection, minWriteId, maxWriteID); + } + /** * Writes using a hive RecordUpdater. * @@ -69,9 +87,27 @@ void close() throws StreamingException; /** - * Get the set of partitions that were added by the record writer. + * Get the set of partitions that were added were used but may have been + * added or not to the metastore. * * @return - set of partitions */ Set getPartitions(); + + /** + * Returns the location of the delta directory. + * @param partitionValues partition values + * @param bucketId bucket id + * @param minWriteId min write Id + * @param maxWriteId max write Id + * @param statementId statement Id + * @param table table + * @return the location of the file + * @throws StreamingException when the path is not found + */ + default Path getDeltaFileLocation(List partitionValues, + Integer bucketId, Long minWriteId, Long maxWriteId, Integer statementId, + Table table) throws StreamingException { + throw new UnsupportedOperationException(); + } } diff --git streaming/src/java/org/apache/hive/streaming/StreamingConnection.java streaming/src/java/org/apache/hive/streaming/StreamingConnection.java index fbe00db1f7..92016e5f68 100644 --- streaming/src/java/org/apache/hive/streaming/StreamingConnection.java +++ streaming/src/java/org/apache/hive/streaming/StreamingConnection.java @@ -19,9 +19,14 @@ package org.apache.hive.streaming; import java.io.InputStream; +import java.util.List; +import java.util.Set; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import javax.annotation.Nullable; + public interface StreamingConnection extends ConnectionInfo, PartitionHandler { /** * Returns hive configuration object used during connection creation. @@ -60,6 +65,18 @@ */ void commitTransaction() throws StreamingException; + /** + * Commit a transaction to make the writes visible for readers. Include + * other partitions that may have been added independently. + * + * @param partitions - extra partitions to commit. + * @throws StreamingException - if there are errors when committing the open transaction. + */ + default void commitTransactionWithPartition(@Nullable Set partitions) + throws StreamingException { + throw new UnsupportedOperationException(); + } + /** * Manually abort the opened transaction. * @@ -78,4 +95,30 @@ * @return - connection stats */ ConnectionStats getConnectionStats(); + + /** + * Get the partitions used during the streaming. This partitions haven't + * been committed to the metastore. + * @return partitions. + */ + default Set getPartitions() { + throw new UnsupportedOperationException(); + } + + /** + * Returns the file that would be used by the writer to write the rows. + * given the parameters + * @param partitionValues partition values + * @param bucketId bucket id + * @param minWriteId min write Id + * @param maxWriteId max write Id + * @param statementId statement Id + * @return the location of the file. + * @throws StreamingException when the path is not found + */ + default Path getDeltaFileLocation(List partitionValues, + Integer bucketId, Long minWriteId, Long maxWriteId, Integer statementId) + throws StreamingException { + throw new UnsupportedOperationException(); + } } diff --git streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java new file mode 100644 index 0000000000..83b2f15df3 --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import java.io.InputStream; +import java.util.List; +import java.util.Set; + +/** + * Common interface for transaction in HiveStreamingConnection. + */ +public interface StreamingTransaction { + /** + * get ready for the next transaction. + * @throws StreamingException + */ + void beginNextTransaction() throws StreamingException; + + /** + * commit transaction. + * @throws StreamingException + */ + void commit() throws StreamingException; + + /** + * Commit transaction and sent to the metastore the created partitions + * in the process. + * @param partitions to commit. + * @throws StreamingException + */ + void commitWithPartitions(Set partitions) throws StreamingException; + + /** + * Abort a transaction. + * @throws StreamingException + */ + void abort() throws StreamingException; + + /** + * Write data withing a transaction. This expectects beginNextTransaction + * to have been called before this and commit to be called after. + * @param record bytes to write. + * @throws StreamingException + */ + void write(byte[] record) throws StreamingException; + + /** + * Write data within a transaction. + * @param stream stream to write. + * @throws StreamingException + */ + void write(InputStream stream) throws StreamingException; + + /** + * Free/close resources used by the streaming transaction. + * @throws StreamingException + */ + void close() throws StreamingException; + + /** + * @return true if closed. + */ + boolean isClosed(); + + /** + * @return the state of the current transaction. + */ + HiveStreamingConnection.TxnState getCurrentTransactionState(); + + /** + * @return remaining number of transactions + */ + int remainingTransactions(); + + /** + * @return the current transaction id being used + */ + long getCurrentTxnId(); + + /** + * @return the current write id being used + */ + long getCurrentWriteId(); + + /** + * Get the partitions that were used in this transaction. They may have + * been created + * @return list of partitions + */ + Set getPartitions(); + + /** + * @return get the paris for transaction ids <--> write ids + */ + List getTxnToWriteIds(); +} diff --git streaming/src/java/org/apache/hive/streaming/TransactionBatch.java streaming/src/java/org/apache/hive/streaming/TransactionBatch.java new file mode 100644 index 0000000000..dabbe2110e --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/TransactionBatch.java @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.metastore.LockComponentBuilder; +import org.apache.hadoop.hive.metastore.LockRequestBuilder; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.DataOperationType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Streaming transaction to use most of the times. Will query the + * metastore to get the transaction ids and the writer ids and then + * will commit them. + */ +public class TransactionBatch extends AbstractStreamingTransaction { + private static final Logger LOG = LoggerFactory.getLogger( + TransactionBatch.class.getName()); + private static final int DEFAULT_HEARTBEAT_INTERVAL = 60 * 1000; + protected Set createdPartitions = null; + private String username; + private HiveStreamingConnection conn; + private ScheduledExecutorService scheduledExecutorService; + private String partNameForLock = null; + private LockRequest lockRequest = null; + // heartbeats can only be sent for open transactions. + // there is a race between committing/aborting a transaction and heartbeat. + // Example: If a heartbeat is sent for committed txn, exception will be thrown. + // Similarly if we don't send a heartbeat, metastore server might abort a txn + // for missed heartbeat right before commit txn call. + // This lock is used to mutex commit/abort and heartbeat calls + private final ReentrantLock transactionLock = new ReentrantLock(); + // min txn id is incremented linearly within a transaction batch. + // keeping minTxnId atomic as it is shared with heartbeat thread + private final AtomicLong minTxnId; + // max txn id does not change for a transaction batch + private final long maxTxnId; + + private String agentInfo; + private int numTxns; + /** + * Tracks the state of each transaction. + */ + private HiveStreamingConnection.TxnState[] txnStatus; + /** + * ID of the last txn used by {@link #beginNextTransactionImpl()}. + */ + private long lastTxnUsed; + + /** + * Represents a batch of transactions acquired from MetaStore. + * + * @param conn - hive streaming connection + * @throws StreamingException if failed to create new RecordUpdater for batch + */ + public TransactionBatch(HiveStreamingConnection conn) throws StreamingException { + boolean success = false; + try { + if (conn.isPartitionedTable() && !conn.isDynamicPartitioning()) { + List partKeys = conn.getTable().getPartitionKeys(); + partNameForLock = Warehouse.makePartName(partKeys, conn.getStaticPartitionValues()); + } + this.conn = conn; + this.username = conn.getUsername(); + this.recordWriter = conn.getRecordWriter(); + this.agentInfo = conn.getAgentInfo(); + this.numTxns = conn.getTransactionBatchSize(); + + setupHeartBeatThread(); + + List txnIds = openTxnImpl(username, numTxns); + txnToWriteIds = allocateWriteIdsImpl(txnIds); + assert (txnToWriteIds.size() == numTxns); + + txnStatus = new HiveStreamingConnection.TxnState[numTxns]; + for (int i = 0; i < txnStatus.length; i++) { + assert (txnToWriteIds.get(i).getTxnId() == txnIds.get(i)); + txnStatus[i] = HiveStreamingConnection.TxnState.OPEN; //Open matches Metastore state + } + this.state = HiveStreamingConnection.TxnState.INACTIVE; + + // initialize record writer with connection and write id info + recordWriter.init(conn, txnToWriteIds.get(0).getWriteId(), + txnToWriteIds.get(numTxns - 1).getWriteId(), conn.getStatementId()); + this.minTxnId = new AtomicLong(txnIds.get(0)); + this.maxTxnId = txnIds.get(txnIds.size() - 1); + success = true; + } catch (TException e) { + throw new StreamingException(conn.toString(), e); + } finally { + //clean up if above throws + markDead(success); + } + } + + private static class HeartbeatRunnable implements Runnable { + private final HiveStreamingConnection conn; + private final AtomicLong minTxnId; + private final long maxTxnId; + private final ReentrantLock transactionLock; + private final AtomicBoolean isTxnClosed; + + HeartbeatRunnable(final HiveStreamingConnection conn, final AtomicLong minTxnId, final long maxTxnId, + final ReentrantLock transactionLock, final AtomicBoolean isTxnClosed) { + this.conn = conn; + this.minTxnId = minTxnId; + this.maxTxnId = maxTxnId; + this.transactionLock = transactionLock; + this.isTxnClosed = isTxnClosed; + } + + @Override + public void run() { + transactionLock.lock(); + try { + if (minTxnId.get() > 0) { + HeartbeatTxnRangeResponse resp = conn.getHeatbeatMSC().heartbeatTxnRange(minTxnId.get(), maxTxnId); + if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) { + LOG.error("Heartbeat failure: {}", resp.toString()); + isTxnClosed.set(true); + } else { + LOG.info("Heartbeat sent for range: [{}-{}]", minTxnId.get(), maxTxnId); + } + } + } catch (TException e) { + LOG.warn("Failure to heartbeat for transaction range: [" + minTxnId.get() + "-" + maxTxnId + "]", e); + } finally { + transactionLock.unlock(); + } + } + } + + private void setupHeartBeatThread() { + // start heartbeat thread + ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("HiveStreamingConnection-Heartbeat-Thread") + .build(); + this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory); + long heartBeatInterval; + long initialDelay; + try { + // if HIVE_TXN_TIMEOUT is defined, heartbeat interval will be HIVE_TXN_TIMEOUT/2 + heartBeatInterval = DbTxnManager.getHeartbeatInterval(conn.getConf()); + } catch (LockException e) { + heartBeatInterval = DEFAULT_HEARTBEAT_INTERVAL; + } + // to introduce some randomness and to avoid hammering the metastore at the same time (same logic as DbTxnManager) + initialDelay = (long) (heartBeatInterval * 0.75 * Math.random()); + LOG.info("Starting heartbeat thread with interval: {} ms initialDelay: {} ms for agentInfo: {}", + heartBeatInterval, initialDelay, conn.getAgentInfo()); + Runnable runnable = new HeartbeatRunnable(conn, minTxnId, maxTxnId, transactionLock, isTxnClosed); + this.scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, heartBeatInterval, TimeUnit + .MILLISECONDS); + } + + private List openTxnImpl(final String user, final int numTxns) throws TException { + return conn.getMSC().openTxns(user, numTxns).getTxn_ids(); + } + + private List allocateWriteIdsImpl(final List txnIds) throws TException { + return conn.getMSC().allocateTableWriteIdsBatch(txnIds, conn.getDatabase(), + conn.getTable().getTableName()); + } + + @Override + public String toString() { + if (txnToWriteIds == null || txnToWriteIds.isEmpty()) { + return "{}"; + } + StringBuilder sb = new StringBuilder(" TxnStatus["); + for (HiveStreamingConnection.TxnState state : txnStatus) { + //'state' should not be null - future proofing + sb.append(state == null ? "N" : state); + } + sb.append("] LastUsed ").append(JavaUtils.txnIdToString(lastTxnUsed)); + return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getTxnId() + + "/" + txnToWriteIds.get(0).getWriteId() + + "..." + + txnToWriteIds.get(txnToWriteIds.size() - 1).getTxnId() + + "/" + txnToWriteIds.get(txnToWriteIds.size() - 1).getWriteId() + + "] on connection = " + conn + "; " + sb; + } + + public void beginNextTransaction() throws StreamingException { + checkIsClosed(); + beginNextTransactionImpl(); + } + + private void beginNextTransactionImpl() throws StreamingException { + beginNextTransactionImpl("No more transactions available in" + + " next batch for connection: " + conn + " user: " + username); + lastTxnUsed = getCurrentTxnId(); + lockRequest = createLockRequest(conn, partNameForLock, username, getCurrentTxnId(), agentInfo); + createdPartitions = Sets.newHashSet(); + try { + LockResponse res = conn.getMSC().lock(lockRequest); + if (res.getState() != LockState.ACQUIRED) { + throw new TransactionError("Unable to acquire lock on " + conn); + } + } catch (TException e) { + throw new TransactionError("Unable to acquire lock on " + conn, e); + } + } + + public void commitWithPartitions(Set partitions) throws StreamingException { + checkIsClosed(); + boolean success = false; + try { + commitImpl(partitions); + success = true; + } finally { + markDead(success); + } + } + + private void commitImpl(Set partitions) throws StreamingException { + try { + recordWriter.flush(); + TxnToWriteId txnToWriteId = txnToWriteIds.get(currentTxnIndex); + if (conn.isDynamicPartitioning()) { + List partNames = new ArrayList<>(recordWriter.getPartitions()); + createdPartitions.addAll(partNames); + if (partitions != null) { + partNames.addAll(partitions); + } + if (!partNames.isEmpty()) { + conn.getMSC().addDynamicPartitions(txnToWriteId.getTxnId(), + txnToWriteId.getWriteId(), conn.getDatabase(), + conn.getTable().getTableName(), partNames, + DataOperationType.INSERT); + } + } + transactionLock.lock(); + try { + conn.getMSC().commitTxn(txnToWriteId.getTxnId()); + // increment the min txn id so that heartbeat thread will heartbeat only from the next open transaction. + // the current transaction is going to committed or fail, so don't need heartbeat for current transaction. + if (currentTxnIndex + 1 < txnToWriteIds.size()) { + minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId()); + } else { + // exhausted the batch, no longer have to heartbeat for current txn batch + minTxnId.set(-1); + } + } finally { + transactionLock.unlock(); + } + state = HiveStreamingConnection.TxnState.COMMITTED; + txnStatus[currentTxnIndex] = HiveStreamingConnection.TxnState.COMMITTED; + } catch (NoSuchTxnException e) { + throw new TransactionError("Invalid transaction id : " + + getCurrentTxnId(), e); + } catch (TxnAbortedException e) { + throw new TransactionError("Aborted transaction " + + "cannot be committed", e); + } catch (TException e) { + throw new TransactionError("Unable to commitTransaction transaction" + + getCurrentTxnId(), e); + } + } + + public void abort() throws StreamingException { + if (isTxnClosed.get()) { + /* + * isDead is only set internally by this class. {@link #markDead(boolean)} will abort all + * remaining txns, so make this no-op to make sure that a well-behaved client that calls abortTransaction() + * error doesn't get misleading errors + */ + return; + } + abort(false); + } + + private void abort(final boolean abortAllRemaining) throws StreamingException { + abortImpl(abortAllRemaining); + } + + private void abortImpl(boolean abortAllRemaining) throws StreamingException { + if (minTxnId == null) { + return; + } + + transactionLock.lock(); + try { + if (abortAllRemaining) { + // we are aborting all txns in the current batch, so no need to heartbeat + minTxnId.set(-1); + //when last txn finished (abortTransaction/commitTransaction) the currentTxnIndex is pointing at that txn + //so we need to start from next one, if any. Also if batch was created but + //fetchTransactionBatch() was never called, we want to start with first txn + int minOpenTxnIndex = Math.max(currentTxnIndex + + (state == HiveStreamingConnection.TxnState.ABORTED + || state == HiveStreamingConnection.TxnState.COMMITTED + ? 1 : 0), 0); + for (currentTxnIndex = minOpenTxnIndex; + currentTxnIndex < txnToWriteIds.size(); currentTxnIndex++) { + conn.getMSC().rollbackTxn(txnToWriteIds.get(currentTxnIndex).getTxnId()); + txnStatus[currentTxnIndex] = HiveStreamingConnection.TxnState.ABORTED; + } + currentTxnIndex--; //since the loop left it == txnToWriteIds.size() + } else { + // we are aborting only the current transaction, so move the min range for heartbeat or disable heartbeat + // if the current txn is last in the batch. + if (currentTxnIndex + 1 < txnToWriteIds.size()) { + minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId()); + } else { + // exhausted the batch, no longer have to heartbeat + minTxnId.set(-1); + } + long currTxnId = getCurrentTxnId(); + if (currTxnId > 0) { + conn.getMSC().rollbackTxn(currTxnId); + txnStatus[currentTxnIndex] = HiveStreamingConnection.TxnState.ABORTED; + } + } + state = HiveStreamingConnection.TxnState.ABORTED; + } catch (NoSuchTxnException e) { + throw new TransactionError("Unable to abort invalid transaction id : " + + getCurrentTxnId(), e); + } catch (TException e) { + throw new TransactionError("Unable to abort transaction id : " + + getCurrentTxnId(), e); + } finally { + transactionLock.unlock(); + } + } + + /** + * Close the TransactionBatch. This will abort any still open txns in this batch. + * + * @throws StreamingException - failure when closing transaction batch + */ + public void close() throws StreamingException { + if (isClosed()) { + return; + } + isTxnClosed.set(true); //also ensures that heartbeat() is no-op since client is likely doing it async + try { + abort(true); //abort all remaining txns + } catch (Exception ex) { + LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex); + throw new StreamingException("Unable to abort", ex); + } + try { + closeImpl(); + } catch (Exception ex) { + LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex); + throw new StreamingException("Unable to close", ex); + } + } + + private void closeImpl() throws StreamingException { + state = HiveStreamingConnection.TxnState.INACTIVE; + recordWriter.close(); + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdownNow(); + } + } + + private static LockRequest createLockRequest(final HiveStreamingConnection connection, + String partNameForLock, String user, long txnId, String agentInfo) { + LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo); + requestBuilder.setUser(user); + requestBuilder.setTransactionId(txnId); + + LockComponentBuilder lockCompBuilder = new LockComponentBuilder() + .setDbName(connection.getDatabase()) + .setTableName(connection.getTable().getTableName()) + .setShared() + .setOperationType(DataOperationType.INSERT); + if (connection.isDynamicPartitioning()) { + lockCompBuilder.setIsDynamicPartitionWrite(true); + } + if (partNameForLock != null && !partNameForLock.isEmpty()) { + lockCompBuilder.setPartitionName(partNameForLock); + } + requestBuilder.addLockComponent(lockCompBuilder.build()); + + return requestBuilder.build(); + } + + /** + * @return the list of created partitions. + */ + @Override + public Set getPartitions() { + return createdPartitions; + } +} diff --git streaming/src/java/org/apache/hive/streaming/TransactionError.java streaming/src/java/org/apache/hive/streaming/TransactionError.java index ae56e7c726..703301243f 100644 --- streaming/src/java/org/apache/hive/streaming/TransactionError.java +++ streaming/src/java/org/apache/hive/streaming/TransactionError.java @@ -18,12 +18,15 @@ package org.apache.hive.streaming; +/** + * Transaction error. + */ public class TransactionError extends StreamingException { - TransactionError(String msg, Exception e) { + public TransactionError(String msg, Exception e) { super(msg + (e == null ? "" : ": " + e.getMessage()), e); } - TransactionError(String msg) { + public TransactionError(String msg) { super(msg); } } diff --git streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java new file mode 100644 index 0000000000..68b0906b3d --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +/** + * Receives a single writeId. Doesn't open connections to the metastore + * so the commit has to be done externally by the entity that created + * the writeId. + */ +public class UnManagedSingleTransaction extends AbstractStreamingTransaction { + private static final Logger LOG = LoggerFactory.getLogger( + UnManagedSingleTransaction.class.getName()); + private final String username; + private final HiveStreamingConnection conn; + private final Set partitions = Sets.newHashSet(); + + public UnManagedSingleTransaction(HiveStreamingConnection conn) + throws StreamingException{ + assert conn.getWriteId() != null; + + this.conn = conn; + this.username = conn.getUsername(); + this.recordWriter = conn.getRecordWriter(); + this.state = HiveStreamingConnection.TxnState.INACTIVE; + + txnToWriteIds = Lists.newArrayList(new TxnToWriteId(-1, + conn.getWriteId())); + + boolean success = false; + try { + recordWriter.init(conn, txnToWriteIds.get(0).getWriteId(), + txnToWriteIds.get(0).getWriteId(), conn.getStatementId()); + success = true; + } finally { + markDead(success); + } + } + + @Override + public void beginNextTransaction() throws StreamingException { + beginNextTransactionImpl("No more transactions available in" + + " next batch for connection: " + conn + " user: " + username); + } + + @Override + public void commitWithPartitions(Set partitions) throws StreamingException { + checkIsClosed(); + boolean success = false; + try { + commitImpl(); + success = true; + } finally { + markDead(success); + } + } + + private void commitImpl() throws StreamingException { + recordWriter.flush(); + List partNames = new ArrayList<>(recordWriter.getPartitions()); + partitions.addAll(partNames); + state = HiveStreamingConnection.TxnState.PREPARED_FOR_COMMIT; + } + + @Override + public void abort() { + if (isTxnClosed.get()) { + return; + } + state = HiveStreamingConnection.TxnState.ABORTED; + } + + @Override + public void close() throws StreamingException { + if (isClosed()) { + return; + } + isTxnClosed.set(true); + abort(); + try { + closeImpl(); + } catch (Exception ex) { + LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex); + throw new StreamingException("Unable to close", ex); + } + } + + private void closeImpl() throws StreamingException { + state = HiveStreamingConnection.TxnState.INACTIVE; + recordWriter.close(); + } + + @Override + public String toString() { + if (txnToWriteIds == null || txnToWriteIds.isEmpty()) { + return "{}"; + } + return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getWriteId() + + "] on connection = " + conn + "; " + "status=" + state; + } + + /** + * @return This class doesn't have a connection to the metastore so it won't + * create any partition + */ + @Override + public Set getPartitions() { + return partitions; + } +} diff --git streaming/src/java/org/apache/hive/streaming/package-info.java streaming/src/java/org/apache/hive/streaming/package-info.java new file mode 100644 index 0000000000..a9f3faee26 --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package grouping streaming classes. + */ +package org.apache.hive.streaming; diff --git streaming/src/java/org/apache/hive/streaming/package.html streaming/src/java/org/apache/hive/streaming/package.html index 2b45792807..19b39e7cfd 100644 --- streaming/src/java/org/apache/hive/streaming/package.html +++ streaming/src/java/org/apache/hive/streaming/package.html @@ -43,7 +43,8 @@

HCatalog Streaming API -- high level description

Note on packaging: The APIs are defined in the -org.apache.hive.streaming Java package and included as +org.apache.hive.streaming and +org.apache.hive.streaming.transaction Java packages and included as the hive-streaming jar.

STREAMING REQUIREMENTS

diff --git streaming/src/test/org/apache/hive/streaming/TestStreaming.java streaming/src/test/org/apache/hive/streaming/TestStreaming.java index 8b5e508d94..1c9e43fad1 100644 --- streaming/src/test/org/apache/hive/streaming/TestStreaming.java +++ streaming/src/test/org/apache/hive/streaming/TestStreaming.java @@ -35,6 +35,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -60,13 +61,13 @@ import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.api.TxnInfo; -import org.apache.hadoop.hive.metastore.api.TxnState; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; @@ -82,6 +83,7 @@ import org.apache.hadoop.hive.ql.io.orc.OrcStruct; import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.ql.io.orc.RecordReader; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.txn.compactor.Worker; @@ -150,7 +152,8 @@ public FileStatus getFileStatus(Path path) throws IOException { if (file.canExecute()) { mod |= 0111; } - return new FileStatus(file.length(), file.isDirectory(), 1, 1024, + return new FileStatus(file.length(), file.isDirectory(), + 1, 1024, file.lastModified(), file.lastModified(), FsPermission.createImmutable(mod), "owen", "users", path); } @@ -418,6 +421,123 @@ public void testNoBuckets() throws Exception { Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000005/bucket_00000")); } + @Test + public void testGetDeltaPath() throws Exception { + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + Path path = connection.getDeltaFileLocation(partitionVals, 0, + 5L, 5L, 9); + Assert.assertTrue(path.toString().endsWith("testing.db/alerts/continent" + + "=Asia/country=India/delta_0000005_0000005_0009/bucket_00000")); + } + + @Test + public void testConnectionWithWriteId() throws Exception { + queryTable(driver, "drop table if exists default.writeidconnection"); + queryTable(driver, "create table default.writeidconnection (a string, b string) stored as orc " + + "TBLPROPERTIES('transactional'='true')"); + queryTable(driver, "insert into default.writeidconnection values('a0','bar')"); + + List rs = queryTable(driver, "select * from default.writeidconnection"); + Assert.assertEquals(1, rs.size()); + Assert.assertEquals("a0\tbar", rs.get(0)); + + StrictDelimitedInputWriter writerT = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection transactionConnection = HiveStreamingConnection.newBuilder() + .withDatabase("Default") + .withTable("writeidconnection") + .withRecordWriter(writerT) + .withHiveConf(conf) + .connect(); + transactionConnection.beginTransaction(); + + Table tObject = transactionConnection.getTable(); + Long writeId = transactionConnection.getCurrentWriteId(); + + Assert.assertNotNull(tObject); + Assert.assertNotNull(writeId); + + StrictDelimitedInputWriter writerOne = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connectionOne = HiveStreamingConnection.newBuilder() + .withDatabase("Default") + .withTable("writeidconnection") + .withRecordWriter(writerOne) + .withHiveConf(conf) + .withWriteId(writeId) + .withStatementId(1) + .withTableObject(tObject) + .connect(); + + StrictDelimitedInputWriter writerTwo = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connectionTwo = HiveStreamingConnection.newBuilder() + .withDatabase("Default") + .withRecordWriter(writerTwo) + .withHiveConf(conf) + .withWriteId(writeId) + .withStatementId(2) + .withTableObject(tObject) + .connect(); + + Assert.assertNotNull(connectionOne); + Assert.assertNotNull(connectionTwo); + + connectionOne.beginTransaction(); + connectionTwo.beginTransaction(); + connectionOne.write("a1,b2".getBytes()); + connectionTwo.write("a5,b6".getBytes()); + connectionOne.write("a3,b4".getBytes()); + connectionOne.commitTransaction(); + connectionTwo.commitTransaction(); + + Assert.assertEquals(HiveStreamingConnection.TxnState.PREPARED_FOR_COMMIT, + connectionOne.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.PREPARED_FOR_COMMIT, + connectionTwo.getCurrentTransactionState()); + + try { + connectionOne.beginTransaction(); + Assert.fail("second beginTransaction should have thrown a " + + "StreamingException"); + } catch (StreamingException e) { + + } + + connectionOne.close(); + connectionTwo.close(); + + rs = queryTable(driver, "select ROW__ID, a, b, " + + "INPUT__FILE__NAME from default.writeidconnection order by ROW__ID"); + // Nothing here since it hasn't been committed + Assert.assertEquals(1, rs.size()); + + transactionConnection.commitTransaction(); + + rs = queryTable(driver, "select ROW__ID, a, b, " + + "INPUT__FILE__NAME from default.writeidconnection order by a"); + Assert.assertEquals(4, rs.size()); + Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\ta0\tbar")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("bucket_00000")); + Assert.assertTrue(rs.get(1), rs.get(1).contains("\"rowid\":0}\ta1\tb2")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("bucket_00000")); + Assert.assertTrue(rs.get(2), rs.get(2).contains("\"rowid\":1}\ta3\tb4")); + Assert.assertTrue(rs.get(2), rs.get(2).endsWith("bucket_00000")); + Assert.assertTrue(rs.get(3), rs.get(3).contains("\ta5\tb6")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("bucket_00000")); + } + @Test public void testAllTypesDelimitedWriter() throws Exception { queryTable(driver, "drop table if exists default.alltypes"); @@ -619,7 +739,7 @@ public void testAutoRollTransactionBatch() throws Exception { } /** - * this is a clone from TestTxnStatement2.... + * this is a clone from TestHiveStreamingConnection.TxnStatement2.... */ public static void runWorker(HiveConf hiveConf) throws MetaException { AtomicBoolean stop = new AtomicBoolean(true); @@ -956,11 +1076,158 @@ public void testAddPartition() throws Exception { // Create partition Assert.assertNotNull(connection); + connection.beginTransaction(); + connection.write("3,Hello streaming - once again".getBytes()); + connection.commitTransaction(); + + // Ensure partition is present + Partition p = msClient.getPartition(dbName, tblName, newPartVals); + Assert.assertNotNull("Did not find added partition", p); + } + + @Test + public void testAddPartitionWithWriteId() throws Exception { + List newPartVals = new ArrayList(2); + newPartVals.add("WriteId_continent"); + newPartVals.add("WriteId_country"); + + StrictDelimitedInputWriter writerT = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection transactionConnection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(newPartVals) + .withRecordWriter(writerT) + .withHiveConf(conf) + .connect(); + transactionConnection.beginTransaction(); + + Table tObject = transactionConnection.getTable(); + Long writeId = transactionConnection.getCurrentWriteId(); + + Assert.assertNotNull(tObject); + Assert.assertNotNull(writeId); + + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(newPartVals) + .withRecordWriter(writer) + .withHiveConf(conf) + .withWriteId(writeId) + .withStatementId(1) + .withTableObject(tObject) + .connect(); + + Assert.assertNotNull(connection); + + connection.beginTransaction(); + connection.write("3,Hello streaming - once again".getBytes()); + connection.commitTransaction(); + + Set partitions = new HashSet<>(connection.getPartitions()); + + connection.close(); + + // Ensure partition is not present + try { + msClient.getPartition(dbName, tblName, newPartVals); + Assert.fail("Partition shouldn't exist so a NoSuchObjectException should have been raised"); + } catch (NoSuchObjectException e) {} + + transactionConnection.commitTransactionWithPartition(partitions); + // Ensure partition is present - Partition p = msClient.getPartition(dbName, tblName, partitionVals); + Partition p = msClient.getPartition(dbName, tblName, newPartVals); Assert.assertNotNull("Did not find added partition", p); } + @Test + public void testAddDynamicPartitionWithWriteId() throws Exception { + queryTable(driver, "drop table if exists default.writeiddynamic"); + queryTable(driver, "create table default.writeiddynamic (a" + + " string, b string) partitioned by (c string, d string)" + + " stored as orc TBLPROPERTIES('transactional'='true')"); + + StrictDelimitedInputWriter writerT = + StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build(); + HiveStreamingConnection transactionConnection = + HiveStreamingConnection.newBuilder().withDatabase("default") + .withTable("writeiddynamic").withRecordWriter(writerT) + .withHiveConf(conf).connect(); + transactionConnection.beginTransaction(); + + Table tObject = transactionConnection.getTable(); + Long writeId = transactionConnection.getCurrentWriteId(); + + Assert.assertNotNull(tObject); + Assert.assertNotNull(writeId); + + StrictDelimitedInputWriter writerOne = + StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build(); + HiveStreamingConnection connectionOne = + HiveStreamingConnection.newBuilder().withDatabase("default") + .withTable("writeiddynamic").withRecordWriter(writerOne) + .withHiveConf(conf).withWriteId(writeId).withStatementId(1) + .withTableObject(tObject).connect(); + + StrictDelimitedInputWriter writerTwo = + StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build(); + HiveStreamingConnection connectionTwo = + HiveStreamingConnection.newBuilder().withDatabase("default") + .withTable("writeiddynamic") + .withRecordWriter(writerTwo) + .withHiveConf(conf).withWriteId(writeId).withStatementId(1) + .withTableObject(tObject) + .connect(); + + Assert.assertNotNull(connectionOne); + + connectionTwo.beginTransaction(); + connectionOne.beginTransaction(); + connectionOne.write("1,2,3,4".getBytes()); + connectionOne.write("1,2,5,6".getBytes()); + connectionTwo.write("1,2,30,40".getBytes()); + connectionOne.write("1,2,7,8".getBytes()); + connectionTwo.write("1,2,50,60".getBytes()); + connectionOne.write("1,2,9,10".getBytes()); + connectionOne.commitTransaction(); + connectionTwo.commitTransaction(); + + Set partitionsOne = new HashSet<>(connectionOne.getPartitions()); + Assert.assertEquals(4, partitionsOne.size()); + + Set partitionsTwo = new HashSet<>(connectionTwo.getPartitions()); + Assert.assertEquals(2, partitionsTwo.size()); + + connectionOne.close(); + connectionTwo.close(); + + try { + String partitionName = partitionsOne.iterator().next(); + msClient.getPartition("default", "writeiddynamic", partitionName); + Assert.fail( + "Partition shouldn't exist so a NoSuchObjectException should have been raised"); + } catch (NoSuchObjectException e) { + } + + partitionsOne.addAll(partitionsTwo); + Set allPartitions = partitionsOne; + transactionConnection.commitTransactionWithPartition(allPartitions); + + // Ensure partition is present + for (String partition : allPartitions) { + Partition p = + msClient.getPartition("default", "writeiddynamic", + partition); + Assert.assertNotNull("Did not find added partition", p); + } + } + @Test public void testTransactionBatchEmptyCommit() throws Exception { // 1) to partitioned table @@ -977,8 +1244,8 @@ public void testTransactionBatchEmptyCommit() throws Exception { .connect(); connection.beginTransaction(); connection.commitTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, + connection.getCurrentTransactionState()); connection.close(); // 2) To unpartitioned table @@ -995,8 +1262,8 @@ public void testTransactionBatchEmptyCommit() throws Exception { connection.beginTransaction(); connection.commitTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, + connection.getCurrentTransactionState()); connection.close(); } @@ -1115,8 +1382,8 @@ public void testTransactionBatchEmptyAbort() throws Exception { connection.beginTransaction(); connection.abortTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED, + connection.getCurrentTransactionState()); connection.close(); // 2) to unpartitioned table @@ -1133,8 +1400,8 @@ public void testTransactionBatchEmptyAbort() throws Exception { connection.beginTransaction(); connection.abortTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED, + connection.getCurrentTransactionState()); connection.close(); } @@ -1156,20 +1423,20 @@ public void testTransactionBatchCommitDelimited() throws Exception { // 1st Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write("1,Hello streaming".getBytes()); connection.commitTransaction(); checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, + connection.getCurrentTransactionState()); // 2nd Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write("2,Welcome to streaming".getBytes()); // data should not be visible @@ -1182,8 +1449,8 @@ public void testTransactionBatchCommitDelimited() throws Exception { connection.close(); - Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE, + connection.getCurrentTransactionState()); // To Unpartitioned table @@ -1199,13 +1466,13 @@ public void testTransactionBatchCommitDelimited() throws Exception { .connect(); // 1st Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write("1,Hello streaming".getBytes()); connection.commitTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, + connection.getCurrentTransactionState()); connection.close(); } @@ -1227,20 +1494,20 @@ public void testTransactionBatchCommitRegex() throws Exception { // 1st Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write("1,Hello streaming".getBytes()); connection.commitTransaction(); checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, + connection.getCurrentTransactionState()); // 2nd Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write("2,Welcome to streaming".getBytes()); // data should not be visible @@ -1252,8 +1519,8 @@ public void testTransactionBatchCommitRegex() throws Exception { "{2, Welcome to streaming}"); connection.close(); - Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE, + connection.getCurrentTransactionState()); // To Unpartitioned table regex = "([^:]*):(.*)"; @@ -1271,13 +1538,13 @@ public void testTransactionBatchCommitRegex() throws Exception { // 1st Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write("1:Hello streaming".getBytes()); connection.commitTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, + connection.getCurrentTransactionState()); connection.close(); } @@ -1328,20 +1595,20 @@ public void testTransactionBatchCommitJson() throws Exception { // 1st Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, + connection.getCurrentTransactionState()); String rec1 = "{\"id\" : 1, \"msg\": \"Hello streaming\"}"; connection.write(rec1.getBytes()); connection.commitTransaction(); checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, + connection.getCurrentTransactionState()); connection.close(); - Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE, + connection.getCurrentTransactionState()); List rs = queryTable(driver, "select * from " + dbName + "." + tblName); Assert.assertEquals(1, rs.size()); @@ -1399,20 +1666,20 @@ public void testRemainingTransactions() throws Exception { connection.beginTransaction(); Assert.assertEquals(--initialCount, connection.remainingTransactions()); for (int rec = 0; rec < 2; ++rec) { - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write((batch * rec + ",Hello streaming").getBytes()); } connection.commitTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, + connection.getCurrentTransactionState()); ++batch; } Assert.assertEquals(0, connection.remainingTransactions()); connection.close(); - Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE, + connection.getCurrentTransactionState()); connection = HiveStreamingConnection.newBuilder() .withDatabase(dbName) @@ -1430,20 +1697,20 @@ public void testRemainingTransactions() throws Exception { connection.beginTransaction(); Assert.assertEquals(--initialCount, connection.remainingTransactions()); for (int rec = 0; rec < 2; ++rec) { - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write((batch * rec + ",Hello streaming").getBytes()); } connection.abortTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED, + connection.getCurrentTransactionState()); ++batch; } Assert.assertEquals(0, connection.remainingTransactions()); connection.close(); - Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE, + connection.getCurrentTransactionState()); } @Test @@ -1468,8 +1735,8 @@ public void testTransactionBatchAbort() throws Exception { checkNothingWritten(partLoc); - Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED, + connection.getCurrentTransactionState()); connection.close(); @@ -1507,8 +1774,8 @@ public void testTransactionBatchAbortAndCommit() throws Exception { checkNothingWritten(partLoc); - Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED, + connection.getCurrentTransactionState()); connection.beginTransaction(); connection.write("1,Hello streaming".getBytes()); @@ -1577,8 +1844,8 @@ public void testMultipleTransactionBatchCommits() throws Exception { "2\tWelcome to streaming", "3\tHello streaming - once again", "4\tWelcome to streaming - once again"); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, + connection.getCurrentTransactionState()); connection.close(); } @@ -1690,10 +1957,10 @@ public void testInterleavedTransactionBatchCommits() throws Exception { "3\tHello streaming - once again", "4\tWelcome to streaming - once again"); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection2.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, + connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, + connection2.getCurrentTransactionState()); connection.close(); connection2.close(); @@ -2511,7 +2778,8 @@ private void recordOffsets(final HiveConf conf, final String dbLocation, } @Test - public void testErrorHandling() throws Exception { + public void testErrorHandling() + throws Exception { String agentInfo = "UT_" + Thread.currentThread().getName(); runCmdOnDriver("create database testErrors"); runCmdOnDriver("use testErrors"); @@ -2538,8 +2806,12 @@ public void testErrorHandling() throws Exception { GetOpenTxnsInfoResponse r = msClient.showTxns(); Assert.assertEquals("HWM didn'table match", 17, r.getTxn_high_water_mark()); List ti = r.getOpen_txns(); - Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState()); - Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState()); + Assert.assertEquals("wrong status ti(0)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(0).getState()); + Assert.assertEquals("wrong status ti(1)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(1).getState()); try { @@ -2621,10 +2893,16 @@ public void testErrorHandling() throws Exception { r = msClient.showTxns(); Assert.assertEquals("HWM didn't match", 19, r.getTxn_high_water_mark()); ti = r.getOpen_txns(); - Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState()); - Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState()); + Assert.assertEquals("wrong status ti(0)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(0).getState()); + Assert.assertEquals("wrong status ti(1)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(1).getState()); //txnid 3 was committed and thus not open - Assert.assertEquals("wrong status ti(2)", TxnState.ABORTED, ti.get(2).getState()); + Assert.assertEquals("wrong status ti(2)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(2).getState()); connection.close(); writer.disableErrors(); @@ -2651,8 +2929,12 @@ public void testErrorHandling() throws Exception { r = msClient.showTxns(); Assert.assertEquals("HWM didn'table match", 21, r.getTxn_high_water_mark()); ti = r.getOpen_txns(); - Assert.assertEquals("wrong status ti(3)", TxnState.ABORTED, ti.get(3).getState()); - Assert.assertEquals("wrong status ti(4)", TxnState.ABORTED, ti.get(4).getState()); + Assert.assertEquals("wrong status ti(3)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(3).getState()); + Assert.assertEquals("wrong status ti(4)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(4).getState()); } // assumes un partitioned table @@ -2953,5 +3235,12 @@ void enableErrors() { void disableErrors() { shouldThrow = false; } + + @Override + public Path getDeltaFileLocation(List partitionValues, + Integer bucketId, Long minWriteId, Long maxWriteId, Integer statementId, + Table table) throws StreamingException { + return null; + } } }