diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/BucketCodec.java b/ql/src/java/org/apache/hadoop/hive/ql/io/BucketCodec.java index dfd4452ad4..454c37a000 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/BucketCodec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/BucketCodec.java @@ -106,7 +106,7 @@ public int encode(AcidOutputFormat.Options options) { private static final int NUM_STATEMENT_ID_BITS = 12; private static final int MAX_VERSION = (1 << NUM_VERSION_BITS) - 1; private static final int MAX_BUCKET_ID = (1 << NUM_BUCKET_ID_BITS) - 1; - private static final int MAX_STATEMENT_ID = (1 << NUM_STATEMENT_ID_BITS) - 1; + public static final int MAX_STATEMENT_ID = (1 << NUM_STATEMENT_ID_BITS) - 1; public static BucketCodec determineVersion(int bucket) { assert 7 << 29 == BucketCodec.TOP3BITS_MASK; diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java index 9e90d36dae..3cd90a8fa2 100644 --- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -38,8 +38,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hive.common.HeapMemoryMonitor; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; @@ -58,14 +60,25 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hive.streaming.transaction.FixedStatementIdChooser; +import org.apache.hive.streaming.transaction.StatementIdChooser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public abstract class AbstractRecordWriter implements RecordWriter { private static final Logger LOG = LoggerFactory.getLogger(AbstractRecordWriter.class.getName()); + /** + * Maximum number of times the writer can fail because there was a + * problem because the file it's writing to already exists/is already + * being created. + */ + private static final int MAX_BEING_CREATED_WRITE_RETRIES = 10; private static final String DEFAULT_LINE_DELIMITER_PATTERN = "[\r\n]"; + private final StatementIdChooser statementIdChooser; + private Integer statementId; protected HiveConf conf; protected StreamingConnection conn; protected Table table; @@ -107,6 +120,16 @@ public AbstractRecordWriter(final String lineDelimiter) { this.lineDelimiter = lineDelimiter == null || lineDelimiter.isEmpty() ? DEFAULT_LINE_DELIMITER_PATTERN : lineDelimiter; + statementIdChooser = new FixedStatementIdChooser(-1); + statementId = statementIdChooser.next(); + } + + public AbstractRecordWriter(final String lineDelimiter, + StatementIdChooser statementIdChooser) { + this.lineDelimiter = lineDelimiter == null || lineDelimiter.isEmpty() ? + DEFAULT_LINE_DELIMITER_PATTERN : lineDelimiter; + this.statementIdChooser = statementIdChooser; + statementId = statementIdChooser.next(); } protected static class OrcMemoryPressureMonitor implements HeapMemoryMonitor.Listener { @@ -430,7 +453,27 @@ public void write(final long writeId, final byte[] record) throws StreamingExcep Object encodedRow = encode(record); int bucket = getBucket(encodedRow); List partitionValues = getPartitionValues(encodedRow); - getRecordUpdater(partitionValues, bucket).insert(writeId, encodedRow); + + for (int i = 0; i < MAX_BEING_CREATED_WRITE_RETRIES + 1; i++) { + try { + getRecordUpdater(partitionValues, bucket).insert(writeId, encodedRow); + break; + } catch (RemoteException rme) { + if (rme.getClassName().equals(AlreadyBeingCreatedException.class.getName())) { + if (i == MAX_BEING_CREATED_WRITE_RETRIES) { + throw rme; + } + reprepareUpdater(partitionValues); + } else { + throw rme; + } + } catch (FileAlreadyExistsException fae) { + if (i == MAX_BEING_CREATED_WRITE_RETRIES) { + throw fae; + } + reprepareUpdater(partitionValues); + } + } // ingest size bytes gets resetted on flush() whereas connection stats is not conn.getConnectionStats().incrementRecordsWritten(); conn.getConnectionStats().incrementRecordsSize(record.length); @@ -440,6 +483,14 @@ public void write(final long writeId, final byte[] record) throws StreamingExcep } } + private void reprepareUpdater(List partitionValues) { + updaters.remove(getKeyFromPartitions(partitionValues)); + statementId = statementIdChooser.next(); + + LOG.info("Bucket file exists, trying with again using new statementId " + + statementId); + } + protected void checkAutoFlush() throws StreamingIOFailure { if (!autoFlush) { return; @@ -492,7 +543,7 @@ protected RecordUpdater createRecordUpdater(final Path partitionPath, int bucket .tableProperties(tblProperties) .minimumWriteId(minWriteId) .maximumWriteId(maxWriteID) - .statementId(-1) + .statementId(statementId) .finalDestination(partitionPath)); } @@ -501,7 +552,7 @@ protected RecordUpdater getRecordUpdater(List partitionValues, int bucke String key; Path destLocation; try { - key = partitionValues == null ? fullyQualifiedTableName : partitionValues.toString(); + key = getKeyFromPartitions(partitionValues); // add partition in metastore for dynamic partition. We make a metastore call for every new partition value that // we encounter even if partition already exists (exists check require a metastore call anyways). if (partitionPaths.containsKey(key)) { @@ -547,6 +598,11 @@ protected RecordUpdater getRecordUpdater(List partitionValues, int bucke return recordUpdater; } + private String getKeyFromPartitions(List partitionValues) { + return partitionValues == null ? + fullyQualifiedTableName : partitionValues.toString(); + } + protected List initializeBuckets() { List result = new ArrayList<>(totalBuckets); for (int bucket = 0; bucket < totalBuckets; bucket++) { diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index 6cf14b064f..b03838b214 100644 --- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -25,53 +25,39 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; - +import com.google.common.collect.Lists; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.LockComponentBuilder; -import org.apache.hadoop.hive.metastore.LockRequestBuilder; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; -import org.apache.hadoop.hive.metastore.api.DataOperationType; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; -import org.apache.hadoop.hive.metastore.api.LockRequest; -import org.apache.hadoop.hive.metastore.api.LockResponse; -import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; -import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.common.util.ShutdownHookManager; +import org.apache.hive.streaming.transaction.StreamingTransaction; +import org.apache.hive.streaming.transaction.TransactionBatch; +import org.apache.hive.streaming.transaction.TxnState; +import org.apache.hive.streaming.transaction.UnManagedSingleTransaction; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Streaming connection implementation for hive. To create a streaming connection, use the builder API @@ -119,24 +105,9 @@ private static final Logger LOG = LoggerFactory.getLogger(HiveStreamingConnection.class.getName()); private static final String DEFAULT_METASTORE_URI = "thrift://localhost:9083"; - private static final int DEFAULT_TRANSACTION_BATCH_SIZE = 1; - private static final int DEFAULT_HEARTBEAT_INTERVAL = 60 * 1000; + private static final int DEFAULT_TRANSACTION_BATCH_SIZE = 10; private static final boolean DEFAULT_STREAMING_OPTIMIZATIONS_ENABLED = true; - public enum TxnState { - INACTIVE("I"), OPEN("O"), COMMITTED("C"), ABORTED("A"); - - private final String code; - - TxnState(String code) { - this.code = code; - } - - public String toString() { - return code; - } - } - // fields populated from builder private String database; private String table; @@ -144,13 +115,13 @@ public String toString() { private String agentInfo; private int transactionBatchSize; private RecordWriter recordWriter; - private TransactionBatch currentTransactionBatch; + private StreamingTransaction currentTransactionBatch; private HiveConf conf; private boolean streamingOptimizations; private AtomicBoolean isConnectionClosed = new AtomicBoolean(false); // internal fields - private boolean isPartitionedTable; + private Boolean isPartitionedTable; private IMetaStoreClient msClient; private IMetaStoreClient heartbeatMSClient; private final String username; @@ -158,6 +129,10 @@ public String toString() { private Table tableObject = null; private String metastoreUri; private ConnectionStats connectionStats; + private final Long writeId; + private boolean manageTransactions; + private int countTransactions = 0; + private List createdPartition = Lists.newArrayList(); private HiveStreamingConnection(Builder builder) throws StreamingException { this.database = builder.database.toLowerCase(); @@ -166,6 +141,11 @@ private HiveStreamingConnection(Builder builder) throws StreamingException { this.conf = builder.hiveConf; this.agentInfo = builder.agentInfo; this.streamingOptimizations = builder.streamingOptimizations; + this.writeId = builder.writeId; + this.tableObject = builder.tableObject; + this.setPartitionedTable(builder.isPartitioned); + this.manageTransactions = builder.manageTransactions; + UserGroupInformation loggedInUser = null; try { loggedInUser = UserGroupInformation.getLoginUser(); @@ -193,13 +173,18 @@ private HiveStreamingConnection(Builder builder) throws StreamingException { if (conf == null) { conf = createHiveConf(this.getClass(), DEFAULT_METASTORE_URI); } + overrideConfSettings(conf); - this.metastoreUri = conf.get(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName()); - this.msClient = getMetaStoreClient(conf, metastoreUri, secureMode, "streaming-connection"); - // We use a separate metastore client for heartbeat calls to ensure heartbeat RPC calls are - // isolated from the other transaction related RPC calls. - this.heartbeatMSClient = getMetaStoreClient(conf, metastoreUri, secureMode, "streaming-connection-heartbeat"); - validateTable(); + if (manageTransactions) { + this.metastoreUri = conf.get(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName()); + this.msClient = getMetaStoreClient(conf, metastoreUri, secureMode, + "streaming-connection"); + // We use a separate metastore client for heartbeat calls to ensure heartbeat RPC calls are + // isolated from the other transaction related RPC calls. + this.heartbeatMSClient = getMetaStoreClient(conf, metastoreUri, secureMode, + "streaming-connection-heartbeat"); + validateTable(); + } LOG.info("STREAMING CONNECTION INFO: {}", toConnectionInfoString()); } @@ -217,6 +202,10 @@ public static Builder newBuilder() { private int transactionBatchSize = DEFAULT_TRANSACTION_BATCH_SIZE; private boolean streamingOptimizations = DEFAULT_STREAMING_OPTIMIZATIONS_ENABLED; private RecordWriter recordWriter; + private Long writeId; + private boolean manageTransactions = true; + private Table tableObject; + private Boolean isPartitioned; /** * Specify database to use for streaming connection. @@ -314,6 +303,32 @@ public Builder withRecordWriter(final RecordWriter recordWriter) { return this; } + /** + * Specify this parameter if we want the current connection + * to join an ongoing transaction without having to query + * the metastore to create it. + * @param writeId write id + * @return builder + */ + public Builder withWriteId(final long writeId) { + this.writeId = writeId; + manageTransactions = false; + return this; + } + + /** + * Specify the table object since sometimes no connections + * to the metastore will be opened. + * @param table table object. + * @return builder + */ + public Builder withTableObject(Table table) { + this.tableObject = table; + this.isPartitioned = tableObject.getPartitionKeys() != null + && !tableObject.getPartitionKeys().isEmpty(); + return this; + } + /** * Returning a streaming connection to hive. * @@ -324,11 +339,27 @@ public HiveStreamingConnection connect() throws StreamingException { throw new StreamingException("Database cannot be null for streaming connection"); } if (table == null) { - throw new StreamingException("Table cannot be null for streaming connection"); + if (tableObject == null) { + throw new StreamingException("Table and table object cannot be " + + "null for streaming connection"); + } else { + table = tableObject.getTableName(); + } + } + + if (tableObject != null && !tableObject.getTableName().equals(table)) { + throw new StreamingException("Table must match tableObject table name"); } + if (recordWriter == null) { throw new StreamingException("Record writer cannot be null for streaming connection"); } + if ((writeId != null && tableObject == null) || + (writeId == null && tableObject != null)){ + throw new StreamingException("If writeId is set, tableObject " + + "must be set as well and vice versa"); + } + HiveStreamingConnection streamingConnection = new HiveStreamingConnection(this); // assigning higher priority than FileSystem shutdown hook so that streaming connection gets closed first before // filesystem close (to avoid ClosedChannelException) @@ -338,7 +369,7 @@ public HiveStreamingConnection connect() throws StreamingException { } } - private void setPartitionedTable(boolean isPartitionedTable) { + private void setPartitionedTable(Boolean isPartitionedTable) { this.isPartitionedTable = isPartitionedTable; } @@ -356,7 +387,8 @@ private String toConnectionInfoString() { "username: " + username + ", " + "secure-mode: " + secureMode + ", " + "record-writer: " + recordWriter.getClass().getSimpleName() + ", " + - "agent-info: " + agentInfo + " }"; + "agent-info: " + agentInfo + ", " + + "writeId: " + writeId + " }"; } @VisibleForTesting @@ -386,12 +418,12 @@ public PartitionInfo createPartitionIfNotExists(final List partitionValu return new PartitionInfo(partName, partLocation, exists); } - IMetaStoreClient getMSC() { + public IMetaStoreClient getMSC() { connectionStats.incrementMetastoreCalls(); return msClient; } - IMetaStoreClient getHeatbeatMSC() { + public IMetaStoreClient getHeatbeatMSC() { connectionStats.incrementMetastoreCalls(); return heartbeatMSClient; } @@ -424,43 +456,6 @@ private void validateTable() throws InvalidTable, ConnectionError { } } - private static class HeartbeatRunnable implements Runnable { - private final HiveStreamingConnection conn; - private final AtomicLong minTxnId; - private final long maxTxnId; - private final ReentrantLock transactionLock; - private final AtomicBoolean isTxnClosed; - - HeartbeatRunnable(final HiveStreamingConnection conn, final AtomicLong minTxnId, final long maxTxnId, - final ReentrantLock transactionLock, final AtomicBoolean isTxnClosed) { - this.conn = conn; - this.minTxnId = minTxnId; - this.maxTxnId = maxTxnId; - this.transactionLock = transactionLock; - this.isTxnClosed = isTxnClosed; - } - - @Override - public void run() { - transactionLock.lock(); - try { - if (minTxnId.get() > 0) { - HeartbeatTxnRangeResponse resp = conn.getHeatbeatMSC().heartbeatTxnRange(minTxnId.get(), maxTxnId); - if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) { - LOG.error("Heartbeat failure: {}", resp.toString()); - isTxnClosed.set(true); - } else { - LOG.info("Heartbeat sent for range: [{}-{}]", minTxnId.get(), maxTxnId); - } - } - } catch (TException e) { - LOG.warn("Failure to heartbeat for transaction range: [" + minTxnId.get() + "-" + maxTxnId + "]", e); - } finally { - transactionLock.unlock(); - } - } - } - private void beginNextTransaction() throws StreamingException { if (currentTransactionBatch == null) { currentTransactionBatch = createNewTransactionBatch(); @@ -481,8 +476,18 @@ private void beginNextTransaction() throws StreamingException { currentTransactionBatch.beginNextTransaction(); } - private TransactionBatch createNewTransactionBatch() throws StreamingException { - return new TransactionBatch(this); + private StreamingTransaction createNewTransactionBatch() throws StreamingException { + countTransactions++; + if (manageTransactions) { + return new TransactionBatch(this); + } else { + if (countTransactions > 1) { + throw new StreamingException("If a writeId is passed for the " + + "construction of HiveStreaming only one transaction batch" + + " can be done"); + } + return new UnManagedSingleTransaction(this); + } } private void checkClosedState() throws StreamingException { @@ -496,7 +501,7 @@ private void checkState() throws StreamingException { if (currentTransactionBatch == null) { throw new StreamingException("Transaction batch is null. Missing beginTransaction?"); } - if (currentTransactionBatch.state != TxnState.OPEN) { + if (currentTransactionBatch.getCurrentTransactionState() != TxnState.OPEN) { throw new StreamingException("Transaction state is not OPEN. Missing beginTransaction?"); } } @@ -509,8 +514,15 @@ public void beginTransaction() throws StreamingException { @Override public void commitTransaction() throws StreamingException { + commitTransactionWithPartition(null); + } + + @Override + public void commitTransactionWithPartition(Set partitions) + throws StreamingException { checkState(); - currentTransactionBatch.commit(); + currentTransactionBatch.commitWithPartitions(partitions); + createdPartition.addAll(currentTransactionBatch.getCreatedPartitions()); connectionStats.incrementCommittedTransactions(); } @@ -549,8 +561,10 @@ public void close() { } catch (StreamingException e) { LOG.warn("Unable to close current transaction batch: " + currentTransactionBatch, e); } finally { - getMSC().close(); - getHeatbeatMSC().close(); + if (manageTransactions) { + getMSC().close(); + getHeatbeatMSC().close(); + } } if (LOG.isInfoEnabled()) { LOG.info("Closed streaming connection. Agent: {} Stats: {}", getAgentInfo(), getConnectionStats()); @@ -595,418 +609,6 @@ Long getCurrentTxnId() { return currentTransactionBatch.getCurrentTxnId(); } - private static class TransactionBatch { - private String username; - private HiveStreamingConnection conn; - private ScheduledExecutorService scheduledExecutorService; - private RecordWriter recordWriter; - private String partNameForLock = null; - private List txnToWriteIds; - private int currentTxnIndex = -1; - private TxnState state; - private LockRequest lockRequest = null; - // heartbeats can only be sent for open transactions. - // there is a race between committing/aborting a transaction and heartbeat. - // Example: If a heartbeat is sent for committed txn, exception will be thrown. - // Similarly if we don't send a heartbeat, metastore server might abort a txn - // for missed heartbeat right before commit txn call. - // This lock is used to mutex commit/abort and heartbeat calls - private final ReentrantLock transactionLock = new ReentrantLock(); - // min txn id is incremented linearly within a transaction batch. - // keeping minTxnId atomic as it is shared with heartbeat thread - private final AtomicLong minTxnId; - // max txn id does not change for a transaction batch - private final long maxTxnId; - - /** - * once any operation on this batch encounters a system exception - * (e.g. IOException on write) it's safest to assume that we can't write to the - * file backing this batch any more. This guards important public methods - */ - private final AtomicBoolean isTxnClosed = new AtomicBoolean(false); - private String agentInfo; - private int numTxns; - /** - * Tracks the state of each transaction - */ - private TxnState[] txnStatus; - /** - * ID of the last txn used by {@link #beginNextTransactionImpl()} - */ - private long lastTxnUsed; - - /** - * Represents a batch of transactions acquired from MetaStore - * - * @param conn - hive streaming connection - * @throws StreamingException if failed to create new RecordUpdater for batch - */ - private TransactionBatch(HiveStreamingConnection conn) throws StreamingException { - boolean success = false; - try { - if (conn.isPartitionedTable() && !conn.isDynamicPartitioning()) { - List partKeys = conn.tableObject.getPartitionKeys(); - partNameForLock = Warehouse.makePartName(partKeys, conn.staticPartitionValues); - } - this.conn = conn; - this.username = conn.username; - this.recordWriter = conn.recordWriter; - this.agentInfo = conn.agentInfo; - this.numTxns = conn.transactionBatchSize; - - setupHeartBeatThread(); - - List txnIds = openTxnImpl(username, numTxns); - txnToWriteIds = allocateWriteIdsImpl(txnIds); - assert (txnToWriteIds.size() == numTxns); - - txnStatus = new TxnState[numTxns]; - for (int i = 0; i < txnStatus.length; i++) { - assert (txnToWriteIds.get(i).getTxnId() == txnIds.get(i)); - txnStatus[i] = TxnState.OPEN; //Open matches Metastore state - } - this.state = TxnState.INACTIVE; - - // initialize record writer with connection and write id info - recordWriter.init(conn, txnToWriteIds.get(0).getWriteId(), txnToWriteIds.get(numTxns - 1).getWriteId()); - this.minTxnId = new AtomicLong(txnIds.get(0)); - this.maxTxnId = txnIds.get(txnIds.size() - 1); - success = true; - } catch (TException e) { - throw new StreamingException(conn.toString(), e); - } finally { - //clean up if above throws - markDead(success); - } - } - - private void setupHeartBeatThread() { - // start heartbeat thread - ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("HiveStreamingConnection-Heartbeat-Thread") - .build(); - this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory); - long heartBeatInterval; - long initialDelay; - try { - // if HIVE_TXN_TIMEOUT is defined, heartbeat interval will be HIVE_TXN_TIMEOUT/2 - heartBeatInterval = DbTxnManager.getHeartbeatInterval(conn.conf); - } catch (LockException e) { - heartBeatInterval = DEFAULT_HEARTBEAT_INTERVAL; - } - // to introduce some randomness and to avoid hammering the metastore at the same time (same logic as DbTxnManager) - initialDelay = (long) (heartBeatInterval * 0.75 * Math.random()); - LOG.info("Starting heartbeat thread with interval: {} ms initialDelay: {} ms for agentInfo: {}", - heartBeatInterval, initialDelay, conn.agentInfo); - Runnable runnable = new HeartbeatRunnable(conn, minTxnId, maxTxnId, transactionLock, isTxnClosed); - this.scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, heartBeatInterval, TimeUnit - .MILLISECONDS); - } - - private List openTxnImpl(final String user, final int numTxns) throws TException { - return conn.getMSC().openTxns(user, numTxns).getTxn_ids(); - } - - private List allocateWriteIdsImpl(final List txnIds) throws TException { - return conn.getMSC().allocateTableWriteIdsBatch(txnIds, conn.database, conn.table); - } - - @Override - public String toString() { - if (txnToWriteIds == null || txnToWriteIds.isEmpty()) { - return "{}"; - } - StringBuilder sb = new StringBuilder(" TxnStatus["); - for (TxnState state : txnStatus) { - //'state' should not be null - future proofing - sb.append(state == null ? "N" : state); - } - sb.append("] LastUsed ").append(JavaUtils.txnIdToString(lastTxnUsed)); - return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getTxnId() - + "/" + txnToWriteIds.get(0).getWriteId() - + "..." - + txnToWriteIds.get(txnToWriteIds.size() - 1).getTxnId() - + "/" + txnToWriteIds.get(txnToWriteIds.size() - 1).getWriteId() - + "] on connection = " + conn + "; " + sb; - } - - private void beginNextTransaction() throws StreamingException { - checkIsClosed(); - beginNextTransactionImpl(); - } - - private void beginNextTransactionImpl() throws TransactionError { - state = TxnState.INACTIVE;//clear state from previous txn - if ((currentTxnIndex + 1) >= txnToWriteIds.size()) { - throw new InvalidTransactionState("No more transactions available in" + - " next batch for connection: " + conn + " user: " + username); - } - currentTxnIndex++; - state = TxnState.OPEN; - lastTxnUsed = getCurrentTxnId(); - lockRequest = createLockRequest(conn, partNameForLock, username, getCurrentTxnId(), agentInfo); - try { - LockResponse res = conn.getMSC().lock(lockRequest); - if (res.getState() != LockState.ACQUIRED) { - throw new TransactionError("Unable to acquire lock on " + conn); - } - } catch (TException e) { - throw new TransactionError("Unable to acquire lock on " + conn, e); - } - } - - long getCurrentTxnId() { - if (currentTxnIndex >= 0) { - return txnToWriteIds.get(currentTxnIndex).getTxnId(); - } - return -1L; - } - - long getCurrentWriteId() { - if (currentTxnIndex >= 0) { - return txnToWriteIds.get(currentTxnIndex).getWriteId(); - } - return -1L; - } - - TxnState getCurrentTransactionState() { - return state; - } - - int remainingTransactions() { - if (currentTxnIndex >= 0) { - return txnToWriteIds.size() - currentTxnIndex - 1; - } - return txnToWriteIds.size(); - } - - - public void write(final byte[] record) throws StreamingException { - checkIsClosed(); - boolean success = false; - try { - recordWriter.write(getCurrentWriteId(), record); - success = true; - } catch (SerializationError ex) { - //this exception indicates that a {@code record} could not be parsed and the - //caller can decide whether to drop it or send it to dead letter queue. - //rolling back the txn and retrying won't help since the tuple will be exactly the same - //when it's replayed. - success = true; - throw ex; - } finally { - markDead(success); - } - } - - public void write(final InputStream inputStream) throws StreamingException { - checkIsClosed(); - boolean success = false; - try { - recordWriter.write(getCurrentWriteId(), inputStream); - success = true; - } catch (SerializationError ex) { - //this exception indicates that a {@code record} could not be parsed and the - //caller can decide whether to drop it or send it to dead letter queue. - //rolling back the txn and retrying won'table help since the tuple will be exactly the same - //when it's replayed. - success = true; - throw ex; - } finally { - markDead(success); - } - } - - private void checkIsClosed() throws StreamingException { - if (isTxnClosed.get()) { - throw new StreamingException("Transaction" + toString() + " is closed()"); - } - } - - /** - * A transaction batch opens a single HDFS file and writes multiple transaction to it. If there is any issue - * with the write, we can't continue to write to the same file any as it may be corrupted now (at the tail). - * This ensures that a client can't ignore these failures and continue to write. - */ - private void markDead(boolean success) throws StreamingException { - if (success) { - return; - } - close(); - } - - - void commit() throws StreamingException { - checkIsClosed(); - boolean success = false; - try { - commitImpl(); - success = true; - } finally { - markDead(success); - } - } - - private void commitImpl() throws StreamingException { - try { - recordWriter.flush(); - TxnToWriteId txnToWriteId = txnToWriteIds.get(currentTxnIndex); - if (conn.isDynamicPartitioning()) { - List partNames = new ArrayList<>(recordWriter.getPartitions()); - conn.getMSC().addDynamicPartitions(txnToWriteId.getTxnId(), txnToWriteId.getWriteId(), conn.database, conn.table, - partNames, DataOperationType.INSERT); - } - transactionLock.lock(); - try { - conn.getMSC().commitTxn(txnToWriteId.getTxnId()); - // increment the min txn id so that heartbeat thread will heartbeat only from the next open transaction. - // the current transaction is going to committed or fail, so don't need heartbeat for current transaction. - if (currentTxnIndex + 1 < txnToWriteIds.size()) { - minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId()); - } else { - // exhausted the batch, no longer have to heartbeat for current txn batch - minTxnId.set(-1); - } - } finally { - transactionLock.unlock(); - } - state = TxnState.COMMITTED; - txnStatus[currentTxnIndex] = TxnState.COMMITTED; - } catch (NoSuchTxnException e) { - throw new TransactionError("Invalid transaction id : " - + getCurrentTxnId(), e); - } catch (TxnAbortedException e) { - throw new TransactionError("Aborted transaction cannot be committed" - , e); - } catch (TException e) { - throw new TransactionError("Unable to commitTransaction transaction" - + getCurrentTxnId(), e); - } - } - - void abort() throws StreamingException { - if (isTxnClosed.get()) { - /* - * isDead is only set internally by this class. {@link #markDead(boolean)} will abort all - * remaining txns, so make this no-op to make sure that a well-behaved client that calls abortTransaction() - * error doesn't get misleading errors - */ - return; - } - abort(false); - } - - private void abort(final boolean abortAllRemaining) throws StreamingException { - abortImpl(abortAllRemaining); - } - - private void abortImpl(boolean abortAllRemaining) throws StreamingException { - if (minTxnId == null) { - return; - } - - transactionLock.lock(); - try { - if (abortAllRemaining) { - // we are aborting all txns in the current batch, so no need to heartbeat - minTxnId.set(-1); - //when last txn finished (abortTransaction/commitTransaction) the currentTxnIndex is pointing at that txn - //so we need to start from next one, if any. Also if batch was created but - //fetchTransactionBatch() was never called, we want to start with first txn - int minOpenTxnIndex = Math.max(currentTxnIndex + - (state == TxnState.ABORTED || state == TxnState.COMMITTED ? 1 : 0), 0); - for (currentTxnIndex = minOpenTxnIndex; - currentTxnIndex < txnToWriteIds.size(); currentTxnIndex++) { - conn.getMSC().rollbackTxn(txnToWriteIds.get(currentTxnIndex).getTxnId()); - txnStatus[currentTxnIndex] = TxnState.ABORTED; - } - currentTxnIndex--;//since the loop left it == txnToWriteIds.size() - } else { - // we are aborting only the current transaction, so move the min range for heartbeat or disable heartbeat - // if the current txn is last in the batch. - if (currentTxnIndex + 1 < txnToWriteIds.size()) { - minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId()); - } else { - // exhausted the batch, no longer have to heartbeat - minTxnId.set(-1); - } - long currTxnId = getCurrentTxnId(); - if (currTxnId > 0) { - conn.getMSC().rollbackTxn(currTxnId); - txnStatus[currentTxnIndex] = TxnState.ABORTED; - } - } - state = TxnState.ABORTED; - } catch (NoSuchTxnException e) { - throw new TransactionError("Unable to abort invalid transaction id : " - + getCurrentTxnId(), e); - } catch (TException e) { - throw new TransactionError("Unable to abort transaction id : " - + getCurrentTxnId(), e); - } finally { - transactionLock.unlock(); - } - } - - public boolean isClosed() { - return isTxnClosed.get(); - } - - /** - * Close the TransactionBatch. This will abort any still open txns in this batch. - * - * @throws StreamingException - failure when closing transaction batch - */ - public void close() throws StreamingException { - if (isTxnClosed.get()) { - return; - } - isTxnClosed.set(true); //also ensures that heartbeat() is no-op since client is likely doing it async - try { - abort(true);//abort all remaining txns - } catch (Exception ex) { - LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex); - throw new StreamingException("Unable to abort", ex); - } - try { - closeImpl(); - } catch (Exception ex) { - LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex); - throw new StreamingException("Unable to close", ex); - } - } - - private void closeImpl() throws StreamingException { - state = TxnState.INACTIVE; - recordWriter.close(); - if (scheduledExecutorService != null) { - scheduledExecutorService.shutdownNow(); - } - } - - static LockRequest createLockRequest(final HiveStreamingConnection connection, - String partNameForLock, String user, long txnId, String agentInfo) { - LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo); - requestBuilder.setUser(user); - requestBuilder.setTransactionId(txnId); - - LockComponentBuilder lockCompBuilder = new LockComponentBuilder() - .setDbName(connection.database) - .setTableName(connection.table) - .setShared() - .setOperationType(DataOperationType.INSERT); - if (connection.isDynamicPartitioning()) { - lockCompBuilder.setIsDynamicPartitionWrite(true); - } - if (partNameForLock != null && !partNameForLock.isEmpty()) { - lockCompBuilder.setPartitionName(partNameForLock); - } - requestBuilder.addLockComponent(lockCompBuilder.build()); - - return requestBuilder.build(); - } - } - private HiveConf createHiveConf(Class clazz, String metaStoreUri) { HiveConf conf = new HiveConf(clazz); if (metaStoreUri != null) { @@ -1049,6 +651,13 @@ private static void setHiveConf(HiveConf conf, String var) { conf.setBoolean(var, true); } + public List getTxnToWriteIds() { + if (currentTransactionBatch != null) { + return currentTransactionBatch.getTxnToWriteIds(); + } + return null; + } + @Override public HiveConf getHiveConf() { return conf; @@ -1083,4 +692,37 @@ public boolean isPartitionedTable() { public boolean isDynamicPartitioning() { return isPartitionedTable() && (staticPartitionValues == null || staticPartitionValues.isEmpty()); } + + @Override + public List getCreatedPartitions() { + return createdPartition; + } + + public String getUsername() { + return username; + } + + public String getDatabase() { + return database; + } + + public RecordWriter getRecordWriter() { + return recordWriter; + } + + public int getTransactionBatchSize() { + return transactionBatchSize; + } + + public HiveConf getConf() { + return conf; + } + + public Long getWriteId() { + return writeId; + } + + public Long getCurrentWriteId() { + return currentTransactionBatch.getCurrentWriteId(); + } } diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java index fbe00db1f7..bd8656e935 100644 --- a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java @@ -19,9 +19,13 @@ package org.apache.hive.streaming; import java.io.InputStream; +import java.util.List; +import java.util.Set; import org.apache.hadoop.hive.conf.HiveConf; +import javax.annotation.Nullable; + public interface StreamingConnection extends ConnectionInfo, PartitionHandler { /** * Returns hive configuration object used during connection creation. @@ -60,6 +64,16 @@ */ void commitTransaction() throws StreamingException; + /** + * Commit a transaction to make the writes visible for readers. Include + * other partitions that may have been added independently. + * + * @param partitions - extra partitions to commit. + * @throws StreamingException - if there are errors when committing the open transaction. + */ + void commitTransactionWithPartition(@Nullable Set partitions) + throws StreamingException; + /** * Manually abort the opened transaction. * @@ -78,4 +92,10 @@ * @return - connection stats */ ConnectionStats getConnectionStats(); + + /** + * Get the created partitions during the streaming. + * @return created partitions. + */ + List getCreatedPartitions(); } diff --git a/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java index 13de1d4719..9ea6f08ba3 100644 --- a/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java @@ -19,9 +19,7 @@ package org.apache.hive.streaming; -import java.io.InputStream; import java.util.Properties; -import java.util.Scanner; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.SerDeException; @@ -31,6 +29,8 @@ import org.apache.hadoop.io.BytesWritable; import com.google.common.base.Joiner; +import org.apache.hive.streaming.transaction.FixedStatementIdChooser; +import org.apache.hive.streaming.transaction.StatementIdChooser; /** * Streaming Writer handles delimited input (eg. CSV). @@ -46,7 +46,7 @@ private LazySimpleSerDe serde; private StrictDelimitedInputWriter(Builder builder) { - super(builder.lineDelimiter); + super(builder.lineDelimiter, builder.statementIdChooser); this.fieldDelimiter = builder.fieldDelimiter; this.collectionDelimiter = builder.collectionDelimiter; this.mapKeyDelimiter = builder.mapKeyDelimiter; @@ -61,6 +61,7 @@ public static Builder newBuilder() { private char collectionDelimiter = (char) LazySerDeParameters.DefaultSeparators[1]; private char mapKeyDelimiter = (char) LazySerDeParameters.DefaultSeparators[2]; private String lineDelimiter; + private StatementIdChooser statementIdChooser; public Builder withFieldDelimiter(final char fieldDelimiter) { this.fieldDelimiter = fieldDelimiter; @@ -82,7 +83,17 @@ public Builder withLineDelimiterPattern(final String lineDelimiter) { return this; } + public Builder withStatementIdChooser(final StatementIdChooser + statementIdChooser) { + this.statementIdChooser = statementIdChooser; + return this; + } + public StrictDelimitedInputWriter build() { + if (statementIdChooser == null) { + statementIdChooser = FixedStatementIdChooser.getInstance(); + } + return new StrictDelimitedInputWriter(this); } } diff --git a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java index 416bd67cfd..2998d803d2 100644 --- a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java @@ -27,6 +27,8 @@ import org.apache.hadoop.io.Text; import com.google.common.base.Joiner; +import org.apache.hive.streaming.transaction.FixedStatementIdChooser; +import org.apache.hive.streaming.transaction.StatementIdChooser; /** * Streaming Writer handles utf8 encoded Json (Strict syntax). @@ -38,7 +40,7 @@ private JsonSerDe serde; public StrictJsonWriter(final Builder builder) { - super(builder.lineDelimiter); + super(builder.lineDelimiter, builder.statementIdChooser); } public static Builder newBuilder() { @@ -47,13 +49,23 @@ public static Builder newBuilder() { public static class Builder { private String lineDelimiter; + private StatementIdChooser statementIdChooser; public Builder withLineDelimiterPattern(final String lineDelimiter) { this.lineDelimiter = lineDelimiter; return this; } + public Builder withStatementIdChooser(final StatementIdChooser + statementIdChooser) { + this.statementIdChooser = statementIdChooser; + return this; + } + public StrictJsonWriter build() { + if (statementIdChooser == null) { + statementIdChooser = FixedStatementIdChooser.getInstance(); + } return new StrictJsonWriter(this); } } diff --git a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java index 12516f5914..dd97da6511 100644 --- a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java @@ -31,6 +31,8 @@ import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.collect.Lists; +import org.apache.hive.streaming.transaction.FixedStatementIdChooser; +import org.apache.hive.streaming.transaction.StatementIdChooser; /** * Streaming Writer handles text input data with regex. Uses @@ -43,7 +45,7 @@ private RegexSerDe serde; private StrictRegexWriter(final Builder builder) { - super(builder.lineDelimiter); + super(builder.lineDelimiter, builder.statementIdChooser); this.regex = builder.regex; } @@ -54,6 +56,7 @@ public static Builder newBuilder() { public static class Builder { private String regex; private String lineDelimiter; + private StatementIdChooser statementIdChooser; public Builder withRegex(final String regex) { this.regex = regex; @@ -65,7 +68,17 @@ public Builder withLineDelimiterPattern(final String lineDelimiter) { return this; } + public Builder withStatementIdChooser(final StatementIdChooser + statementIdChooser) { + this.statementIdChooser = statementIdChooser; + return this; + } + + public StrictRegexWriter build() { + if (statementIdChooser == null) { + statementIdChooser = FixedStatementIdChooser.getInstance(); + } return new StrictRegexWriter(this); } } diff --git a/streaming/src/java/org/apache/hive/streaming/package.html b/streaming/src/java/org/apache/hive/streaming/package.html index 2b45792807..5bfe04ce44 100644 --- a/streaming/src/java/org/apache/hive/streaming/package.html +++ b/streaming/src/java/org/apache/hive/streaming/package.html @@ -43,7 +43,8 @@

HCatalog Streaming API -- high level description

Note on packaging: The APIs are defined in the -org.apache.hive.streaming Java package and included as +org.apache.hive.streaming and +org.apache.hive.streaming. transaction Java packages and included as the hive-streaming jar.

STREAMING REQUIREMENTS

diff --git a/streaming/src/java/org/apache/hive/streaming/transaction/AbstractStreamingTransaction.java b/streaming/src/java/org/apache/hive/streaming/transaction/AbstractStreamingTransaction.java new file mode 100644 index 0000000000..e9c4fd415e --- /dev/null +++ b/streaming/src/java/org/apache/hive/streaming/transaction/AbstractStreamingTransaction.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming.transaction; + +import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.apache.hive.streaming.RecordWriter; +import org.apache.hive.streaming.SerializationError; +import org.apache.hive.streaming.StreamingException; + +import java.io.InputStream; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Common methods for the implementing classes. + */ +abstract class AbstractStreamingTransaction + implements StreamingTransaction { + + /** + * This variable should be initialized by the children. + */ + protected RecordWriter recordWriter; + + /** + * This variable should be initialized by the children. + */ + protected List txnToWriteIds; + + + /** + * once any operation on this batch encounters a system exception + * (e.g. IOException on write) it's safest to assume that we can't write to the + * file backing this batch any more. This guards important public methods + */ + protected final AtomicBoolean isTxnClosed = new AtomicBoolean(false); + + protected int currentTxnIndex = -1; + protected TxnState state; + + + protected void checkIsClosed() throws StreamingException { + if (isTxnClosed.get()) { + throw new StreamingException("Transaction" + toString() + " is closed()"); + } + } + + protected void beginNextTransactionImpl(String errorMessage) + throws StreamingException{ + state = TxnState.INACTIVE; //clear state from previous txn + if ((currentTxnIndex + 1) >= txnToWriteIds.size()) { + throw new InvalidTransactionState(errorMessage); + } + currentTxnIndex++; + state = TxnState.OPEN; + } + + public void write(final byte[] record) throws StreamingException { + checkIsClosed(); + boolean success = false; + try { + recordWriter.write(getCurrentWriteId(), record); + success = true; + } catch (SerializationError ex) { + //this exception indicates that a {@code record} could not be parsed and the + //caller can decide whether to drop it or send it to dead letter queue. + //rolling back the txn and retrying won't help since the tuple will be exactly the same + //when it's replayed. + success = true; + throw ex; + } finally { + markDead(success); + } + } + + public void write(final InputStream inputStream) throws StreamingException { + checkIsClosed(); + boolean success = false; + try { + recordWriter.write(getCurrentWriteId(), inputStream); + success = true; + } catch (SerializationError ex) { + //this exception indicates that a {@code record} could not be parsed and the + //caller can decide whether to drop it or send it to dead letter queue. + //rolling back the txn and retrying won'table help since the tuple will be exactly the same + //when it's replayed. + success = true; + throw ex; + } finally { + markDead(success); + } + } + + /** + * A transaction batch opens a single HDFS file and writes multiple transaction to it. If there is any issue + * with the write, we can't continue to write to the same file any as it may be corrupted now (at the tail). + * This ensures that a client can't ignore these failures and continue to write. + */ + protected void markDead(boolean success) throws StreamingException { + if (success) { + return; + } + close(); + } + + public long getCurrentWriteId() { + if (currentTxnIndex >= 0) { + return txnToWriteIds.get(currentTxnIndex).getWriteId(); + } + return -1L; + } + + public int remainingTransactions() { + if (currentTxnIndex >= 0) { + return txnToWriteIds.size() - currentTxnIndex - 1; + } + return txnToWriteIds.size(); + } + + public boolean isClosed() { + return isTxnClosed.get(); + } + + public TxnState getCurrentTransactionState() { + return state; + } + + public long getCurrentTxnId() { + if (currentTxnIndex >= 0) { + return txnToWriteIds.get(currentTxnIndex).getTxnId(); + } + return -1L; + } + + @Override + public List getTxnToWriteIds() { + return txnToWriteIds; + } + + public void commit() throws StreamingException { + commitWithPartitions(null); + } +} diff --git a/streaming/src/java/org/apache/hive/streaming/transaction/FixedStatementIdChooser.java b/streaming/src/java/org/apache/hive/streaming/transaction/FixedStatementIdChooser.java new file mode 100644 index 0000000000..95596cf411 --- /dev/null +++ b/streaming/src/java/org/apache/hive/streaming/transaction/FixedStatementIdChooser.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming.transaction; + +/** + * Retuns always the same statement id. Usually used to return -1 + * and therefore disable the statement id. + */ +public class FixedStatementIdChooser implements StatementIdChooser { + private static FixedStatementIdChooser instance; + private final int fixed; + public FixedStatementIdChooser(int fixed) { + this.fixed = fixed; + } + @Override + public int next() { + return fixed; + } + + /** + * Use this to disable the statement id. + * @return Fixed Instance that always returns -1 + */ + public static synchronized FixedStatementIdChooser getInstance(){ + if(instance == null){ + instance = new FixedStatementIdChooser(-1); + } + return instance; + } +} diff --git a/streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java b/streaming/src/java/org/apache/hive/streaming/transaction/InvalidTransactionState.java similarity index 92% rename from streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java rename to streaming/src/java/org/apache/hive/streaming/transaction/InvalidTransactionState.java index 9d92dfa516..fa58aefca4 100644 --- a/streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java +++ b/streaming/src/java/org/apache/hive/streaming/transaction/InvalidTransactionState.java @@ -16,8 +16,11 @@ * limitations under the License. */ -package org.apache.hive.streaming; +package org.apache.hive.streaming.transaction; +/** + * Invalid transaction. + */ public class InvalidTransactionState extends TransactionError { InvalidTransactionState(String msg) { super(msg); diff --git a/streaming/src/java/org/apache/hive/streaming/transaction/RandomStatementIdChooser.java b/streaming/src/java/org/apache/hive/streaming/transaction/RandomStatementIdChooser.java new file mode 100644 index 0000000000..f0d397ab96 --- /dev/null +++ b/streaming/src/java/org/apache/hive/streaming/transaction/RandomStatementIdChooser.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming.transaction; + +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.BucketCodec; +import java.util.Random; + +/** + * We have to choose a statement id but we don't know if other + * writers, which are in different process or even different hosts + * may have chose it yet. This algorithm probably can be improved + * but currently it does the following: + * - Choose random int + * - If the delta directory can be created finish + * - If it fails add one to the int until it succeeds + * The max for the statement id is Math.min(AcidUtils.MAX_STATEMENTS_PER_TXN, + * BucketCodec.MAX_STATEMENT_ID + 1) at the moment which is around 5000. + */ +public class RandomStatementIdChooser implements StatementIdChooser { + private int current; + private final int maxId; + public RandomStatementIdChooser() { + maxId = Math.min(AcidUtils.MAX_STATEMENTS_PER_TXN, BucketCodec.MAX_STATEMENT_ID + 1); + current = new Random().nextInt(maxId); + } + public int next() { + return ++current % maxId; + } +} diff --git a/streaming/src/java/org/apache/hive/streaming/transaction/StatementIdChooser.java b/streaming/src/java/org/apache/hive/streaming/transaction/StatementIdChooser.java new file mode 100644 index 0000000000..00f7cb6d37 --- /dev/null +++ b/streaming/src/java/org/apache/hive/streaming/transaction/StatementIdChooser.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming.transaction; + +/** + * Return an int corresponding to the statementId. + */ +public interface StatementIdChooser { + int next(); +} diff --git a/streaming/src/java/org/apache/hive/streaming/transaction/StreamingTransaction.java b/streaming/src/java/org/apache/hive/streaming/transaction/StreamingTransaction.java new file mode 100644 index 0000000000..1e54c88204 --- /dev/null +++ b/streaming/src/java/org/apache/hive/streaming/transaction/StreamingTransaction.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming.transaction; + +import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.apache.hive.streaming.StreamingException; +import java.io.InputStream; +import java.util.List; +import java.util.Set; + +/** + * Common interface for transaction in HiveStreamingConnection. + */ +public interface StreamingTransaction { + void beginNextTransaction() throws StreamingException; + void commit() throws StreamingException; + void commitWithPartitions(Set partitions) throws StreamingException; + void abort() throws StreamingException; + void write(byte[] record) throws StreamingException; + void write(InputStream record) throws StreamingException; + void close() throws StreamingException; + + boolean isClosed(); + TxnState getCurrentTransactionState(); + int remainingTransactions(); + long getCurrentTxnId(); + long getCurrentWriteId(); + List getCreatedPartitions(); + List getTxnToWriteIds(); +} diff --git a/streaming/src/java/org/apache/hive/streaming/transaction/TransactionBatch.java b/streaming/src/java/org/apache/hive/streaming/transaction/TransactionBatch.java new file mode 100644 index 0000000000..493a85e9af --- /dev/null +++ b/streaming/src/java/org/apache/hive/streaming/transaction/TransactionBatch.java @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming.transaction; + +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.metastore.LockComponentBuilder; +import org.apache.hadoop.hive.metastore.LockRequestBuilder; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.DataOperationType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.hive.streaming.HiveStreamingConnection; +import org.apache.hive.streaming.StreamingException; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Streaming transaction to use most of the times. Will query the + * metastore to get the transaction ids and the writer ids and then + * will commit them. + */ +public class TransactionBatch extends AbstractStreamingTransaction { + private static final Logger LOG = LoggerFactory.getLogger( + TransactionBatch.class.getName()); + private static final int DEFAULT_HEARTBEAT_INTERVAL = 60 * 1000; + protected List createdPartitions = null; + private String username; + private HiveStreamingConnection conn; + private ScheduledExecutorService scheduledExecutorService; + private String partNameForLock = null; + private LockRequest lockRequest = null; + // heartbeats can only be sent for open transactions. + // there is a race between committing/aborting a transaction and heartbeat. + // Example: If a heartbeat is sent for committed txn, exception will be thrown. + // Similarly if we don't send a heartbeat, metastore server might abort a txn + // for missed heartbeat right before commit txn call. + // This lock is used to mutex commit/abort and heartbeat calls + private final ReentrantLock transactionLock = new ReentrantLock(); + // min txn id is incremented linearly within a transaction batch. + // keeping minTxnId atomic as it is shared with heartbeat thread + private final AtomicLong minTxnId; + // max txn id does not change for a transaction batch + private final long maxTxnId; + + private String agentInfo; + private int numTxns; + /** + * Tracks the state of each transaction. + */ + private TxnState[] txnStatus; + /** + * ID of the last txn used by {@link #beginNextTransactionImpl()}. + */ + private long lastTxnUsed; + + /** + * Represents a batch of transactions acquired from MetaStore. + * + * @param conn - hive streaming connection + * @throws StreamingException if failed to create new RecordUpdater for batch + */ + public TransactionBatch(HiveStreamingConnection conn) throws StreamingException { + boolean success = false; + try { + if (conn.isPartitionedTable() && !conn.isDynamicPartitioning()) { + List partKeys = conn.getTable().getPartitionKeys(); + partNameForLock = Warehouse.makePartName(partKeys, conn.getStaticPartitionValues()); + } + this.conn = conn; + this.username = conn.getUsername(); + this.recordWriter = conn.getRecordWriter(); + this.agentInfo = conn.getAgentInfo(); + this.numTxns = conn.getTransactionBatchSize(); + + setupHeartBeatThread(); + + List txnIds = openTxnImpl(username, numTxns); + txnToWriteIds = allocateWriteIdsImpl(txnIds); + assert (txnToWriteIds.size() == numTxns); + + txnStatus = new TxnState[numTxns]; + for (int i = 0; i < txnStatus.length; i++) { + assert (txnToWriteIds.get(i).getTxnId() == txnIds.get(i)); + txnStatus[i] = TxnState.OPEN; //Open matches Metastore state + } + this.state = TxnState.INACTIVE; + + // initialize record writer with connection and write id info + recordWriter.init(conn, txnToWriteIds.get(0).getWriteId(), txnToWriteIds.get(numTxns - 1).getWriteId()); + this.minTxnId = new AtomicLong(txnIds.get(0)); + this.maxTxnId = txnIds.get(txnIds.size() - 1); + success = true; + } catch (TException e) { + throw new StreamingException(conn.toString(), e); + } finally { + //clean up if above throws + markDead(success); + } + } + + private static class HeartbeatRunnable implements Runnable { + private final HiveStreamingConnection conn; + private final AtomicLong minTxnId; + private final long maxTxnId; + private final ReentrantLock transactionLock; + private final AtomicBoolean isTxnClosed; + + HeartbeatRunnable(final HiveStreamingConnection conn, final AtomicLong minTxnId, final long maxTxnId, + final ReentrantLock transactionLock, final AtomicBoolean isTxnClosed) { + this.conn = conn; + this.minTxnId = minTxnId; + this.maxTxnId = maxTxnId; + this.transactionLock = transactionLock; + this.isTxnClosed = isTxnClosed; + } + + @Override + public void run() { + transactionLock.lock(); + try { + if (minTxnId.get() > 0) { + HeartbeatTxnRangeResponse resp = conn.getHeatbeatMSC().heartbeatTxnRange(minTxnId.get(), maxTxnId); + if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) { + LOG.error("Heartbeat failure: {}", resp.toString()); + isTxnClosed.set(true); + } else { + LOG.info("Heartbeat sent for range: [{}-{}]", minTxnId.get(), maxTxnId); + } + } + } catch (TException e) { + LOG.warn("Failure to heartbeat for transaction range: [" + minTxnId.get() + "-" + maxTxnId + "]", e); + } finally { + transactionLock.unlock(); + } + } + } + + private void setupHeartBeatThread() { + // start heartbeat thread + ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("HiveStreamingConnection-Heartbeat-Thread") + .build(); + this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory); + long heartBeatInterval; + long initialDelay; + try { + // if HIVE_TXN_TIMEOUT is defined, heartbeat interval will be HIVE_TXN_TIMEOUT/2 + heartBeatInterval = DbTxnManager.getHeartbeatInterval(conn.getConf()); + } catch (LockException e) { + heartBeatInterval = DEFAULT_HEARTBEAT_INTERVAL; + } + // to introduce some randomness and to avoid hammering the metastore at the same time (same logic as DbTxnManager) + initialDelay = (long) (heartBeatInterval * 0.75 * Math.random()); + LOG.info("Starting heartbeat thread with interval: {} ms initialDelay: {} ms for agentInfo: {}", + heartBeatInterval, initialDelay, conn.getAgentInfo()); + Runnable runnable = new HeartbeatRunnable(conn, minTxnId, maxTxnId, transactionLock, isTxnClosed); + this.scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, heartBeatInterval, TimeUnit + .MILLISECONDS); + } + + private List openTxnImpl(final String user, final int numTxns) throws TException { + return conn.getMSC().openTxns(user, numTxns).getTxn_ids(); + } + + private List allocateWriteIdsImpl(final List txnIds) throws TException { + return conn.getMSC().allocateTableWriteIdsBatch(txnIds, conn.getDatabase(), + conn.getTable().getTableName()); + } + + @Override + public String toString() { + if (txnToWriteIds == null || txnToWriteIds.isEmpty()) { + return "{}"; + } + StringBuilder sb = new StringBuilder(" TxnStatus["); + for (TxnState state : txnStatus) { + //'state' should not be null - future proofing + sb.append(state == null ? "N" : state); + } + sb.append("] LastUsed ").append(JavaUtils.txnIdToString(lastTxnUsed)); + return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getTxnId() + + "/" + txnToWriteIds.get(0).getWriteId() + + "..." + + txnToWriteIds.get(txnToWriteIds.size() - 1).getTxnId() + + "/" + txnToWriteIds.get(txnToWriteIds.size() - 1).getWriteId() + + "] on connection = " + conn + "; " + sb; + } + + public void beginNextTransaction() throws StreamingException { + checkIsClosed(); + beginNextTransactionImpl(); + } + + private void beginNextTransactionImpl() throws StreamingException { + beginNextTransactionImpl("No more transactions available in" + + " next batch for connection: " + conn + " user: " + username); + lastTxnUsed = getCurrentTxnId(); + lockRequest = createLockRequest(conn, partNameForLock, username, getCurrentTxnId(), agentInfo); + createdPartitions = Lists.newArrayList(); + try { + LockResponse res = conn.getMSC().lock(lockRequest); + if (res.getState() != LockState.ACQUIRED) { + throw new TransactionError("Unable to acquire lock on " + conn); + } + } catch (TException e) { + throw new TransactionError("Unable to acquire lock on " + conn, e); + } + } + + public void commitWithPartitions(Set partitions) throws StreamingException { + checkIsClosed(); + boolean success = false; + try { + commitImpl(partitions); + success = true; + } finally { + markDead(success); + } + } + + private void commitImpl(Set partitions) throws StreamingException { + try { + recordWriter.flush(); + TxnToWriteId txnToWriteId = txnToWriteIds.get(currentTxnIndex); + if (conn.isDynamicPartitioning()) { + List partNames = new ArrayList<>(recordWriter.getPartitions()); + createdPartitions.addAll(partNames); + if (partitions != null) { + partNames.addAll(partitions); + } + conn.getMSC().addDynamicPartitions(txnToWriteId.getTxnId(), + txnToWriteId.getWriteId(), conn.getDatabase(), + conn.getTable().getTableName(), partNames, + DataOperationType.INSERT); + } + transactionLock.lock(); + try { + conn.getMSC().commitTxn(txnToWriteId.getTxnId()); + // increment the min txn id so that heartbeat thread will heartbeat only from the next open transaction. + // the current transaction is going to committed or fail, so don't need heartbeat for current transaction. + if (currentTxnIndex + 1 < txnToWriteIds.size()) { + minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId()); + } else { + // exhausted the batch, no longer have to heartbeat for current txn batch + minTxnId.set(-1); + } + } finally { + transactionLock.unlock(); + } + state = TxnState.COMMITTED; + txnStatus[currentTxnIndex] = TxnState.COMMITTED; + } catch (NoSuchTxnException e) { + throw new TransactionError("Invalid transaction id : " + + getCurrentTxnId(), e); + } catch (TxnAbortedException e) { + throw new TransactionError("Aborted transaction " + + "cannot be committed", e); + } catch (TException e) { + throw new TransactionError("Unable to commitTransaction transaction" + + getCurrentTxnId(), e); + } + } + + public void abort() throws StreamingException { + if (isTxnClosed.get()) { + /* + * isDead is only set internally by this class. {@link #markDead(boolean)} will abort all + * remaining txns, so make this no-op to make sure that a well-behaved client that calls abortTransaction() + * error doesn't get misleading errors + */ + return; + } + abort(false); + } + + private void abort(final boolean abortAllRemaining) throws StreamingException { + abortImpl(abortAllRemaining); + } + + private void abortImpl(boolean abortAllRemaining) throws StreamingException { + if (minTxnId == null) { + return; + } + + transactionLock.lock(); + try { + if (abortAllRemaining) { + // we are aborting all txns in the current batch, so no need to heartbeat + minTxnId.set(-1); + //when last txn finished (abortTransaction/commitTransaction) the currentTxnIndex is pointing at that txn + //so we need to start from next one, if any. Also if batch was created but + //fetchTransactionBatch() was never called, we want to start with first txn + int minOpenTxnIndex = Math.max(currentTxnIndex + + (state == TxnState.ABORTED || state == TxnState.COMMITTED ? 1 : 0), 0); + for (currentTxnIndex = minOpenTxnIndex; + currentTxnIndex < txnToWriteIds.size(); currentTxnIndex++) { + conn.getMSC().rollbackTxn(txnToWriteIds.get(currentTxnIndex).getTxnId()); + txnStatus[currentTxnIndex] = TxnState.ABORTED; + } + currentTxnIndex--; //since the loop left it == txnToWriteIds.size() + } else { + // we are aborting only the current transaction, so move the min range for heartbeat or disable heartbeat + // if the current txn is last in the batch. + if (currentTxnIndex + 1 < txnToWriteIds.size()) { + minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId()); + } else { + // exhausted the batch, no longer have to heartbeat + minTxnId.set(-1); + } + long currTxnId = getCurrentTxnId(); + if (currTxnId > 0) { + conn.getMSC().rollbackTxn(currTxnId); + txnStatus[currentTxnIndex] = TxnState.ABORTED; + } + } + state = TxnState.ABORTED; + } catch (NoSuchTxnException e) { + throw new TransactionError("Unable to abort invalid transaction id : " + + getCurrentTxnId(), e); + } catch (TException e) { + throw new TransactionError("Unable to abort transaction id : " + + getCurrentTxnId(), e); + } finally { + transactionLock.unlock(); + } + } + + /** + * Close the TransactionBatch. This will abort any still open txns in this batch. + * + * @throws StreamingException - failure when closing transaction batch + */ + public void close() throws StreamingException { + if (isClosed()) { + return; + } + isTxnClosed.set(true); //also ensures that heartbeat() is no-op since client is likely doing it async + try { + abort(true); //abort all remaining txns + } catch (Exception ex) { + LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex); + throw new StreamingException("Unable to abort", ex); + } + try { + closeImpl(); + } catch (Exception ex) { + LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex); + throw new StreamingException("Unable to close", ex); + } + } + + private void closeImpl() throws StreamingException { + state = TxnState.INACTIVE; + recordWriter.close(); + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdownNow(); + } + } + + private static LockRequest createLockRequest(final HiveStreamingConnection connection, + String partNameForLock, String user, long txnId, String agentInfo) { + LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo); + requestBuilder.setUser(user); + requestBuilder.setTransactionId(txnId); + + LockComponentBuilder lockCompBuilder = new LockComponentBuilder() + .setDbName(connection.getDatabase()) + .setTableName(connection.getTable().getTableName()) + .setShared() + .setOperationType(DataOperationType.INSERT); + if (connection.isDynamicPartitioning()) { + lockCompBuilder.setIsDynamicPartitionWrite(true); + } + if (partNameForLock != null && !partNameForLock.isEmpty()) { + lockCompBuilder.setPartitionName(partNameForLock); + } + requestBuilder.addLockComponent(lockCompBuilder.build()); + + return requestBuilder.build(); + } + + @Override + public List getCreatedPartitions() { + return createdPartitions; + } +} diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionError.java b/streaming/src/java/org/apache/hive/streaming/transaction/TransactionError.java similarity index 81% rename from streaming/src/java/org/apache/hive/streaming/TransactionError.java rename to streaming/src/java/org/apache/hive/streaming/transaction/TransactionError.java index ae56e7c726..98dc613b3e 100644 --- a/streaming/src/java/org/apache/hive/streaming/TransactionError.java +++ b/streaming/src/java/org/apache/hive/streaming/transaction/TransactionError.java @@ -16,14 +16,19 @@ * limitations under the License. */ -package org.apache.hive.streaming; +package org.apache.hive.streaming.transaction; +import org.apache.hive.streaming.StreamingException; + +/** + * Transaction error. + */ public class TransactionError extends StreamingException { - TransactionError(String msg, Exception e) { + public TransactionError(String msg, Exception e) { super(msg + (e == null ? "" : ": " + e.getMessage()), e); } - TransactionError(String msg) { + public TransactionError(String msg) { super(msg); } } diff --git a/streaming/src/java/org/apache/hive/streaming/transaction/TxnState.java b/streaming/src/java/org/apache/hive/streaming/transaction/TxnState.java new file mode 100644 index 0000000000..7b12d116c4 --- /dev/null +++ b/streaming/src/java/org/apache/hive/streaming/transaction/TxnState.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming.transaction; + +/** + * States for a transaction. + */ +public enum TxnState { + INACTIVE("I"), OPEN("O"), COMMITTED("C"), ABORTED("A"), + PREPARED_FOR_COMMIT("P"); + + private final String code; + + TxnState(String code) { + this.code = code; + } + + public String toString() { + return code; + } +} diff --git a/streaming/src/java/org/apache/hive/streaming/transaction/UnManagedSingleTransaction.java b/streaming/src/java/org/apache/hive/streaming/transaction/UnManagedSingleTransaction.java new file mode 100644 index 0000000000..e471812dcb --- /dev/null +++ b/streaming/src/java/org/apache/hive/streaming/transaction/UnManagedSingleTransaction.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming.transaction; + +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.apache.hive.streaming.HiveStreamingConnection; +import org.apache.hive.streaming.StreamingException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +/** + * Receives a single writeId. Doesn't open connections to the metastore + * so the commit has to be done externally by the entity that created + * the writeId. + */ +public class UnManagedSingleTransaction extends AbstractStreamingTransaction { + private static final Logger LOG = LoggerFactory.getLogger( + UnManagedSingleTransaction.class.getName()); + private final String username; + private final HiveStreamingConnection conn; + private final List createdPartitions = Lists.newArrayList(); + + public UnManagedSingleTransaction(HiveStreamingConnection conn) + throws StreamingException{ + assert conn.getWriteId() != null; + + this.conn = conn; + this.username = conn.getUsername(); + this.recordWriter = conn.getRecordWriter(); + this.state = TxnState.INACTIVE; + + txnToWriteIds = Lists.newArrayList(new TxnToWriteId(-1, + conn.getWriteId())); + + boolean success = false; + try { + recordWriter.init(conn, txnToWriteIds.get(0).getWriteId(), txnToWriteIds.get(0).getWriteId()); + success = true; + } finally { + markDead(success); + } + } + + @Override + public void beginNextTransaction() throws StreamingException { + beginNextTransactionImpl("No more transactions available in" + + " next batch for connection: " + conn + " user: " + username); + } + + @Override + public void commitWithPartitions(Set partitions) throws StreamingException { + checkIsClosed(); + boolean success = false; + try { + commitImpl(); + success = true; + } finally { + markDead(success); + } + } + + private void commitImpl() throws StreamingException { + recordWriter.flush(); + List partNames = new ArrayList<>(recordWriter.getPartitions()); + createdPartitions.addAll(partNames); + state = TxnState.PREPARED_FOR_COMMIT; + } + + @Override + public void abort() { + if (isTxnClosed.get()) { + return; + } + state = TxnState.ABORTED; + } + + @Override + public void close() throws StreamingException { + if (isClosed()) { + return; + } + isTxnClosed.set(true); + abort(); + try { + closeImpl(); + } catch (Exception ex) { + LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex); + throw new StreamingException("Unable to close", ex); + } + } + + private void closeImpl() throws StreamingException { + state = TxnState.INACTIVE; + recordWriter.close(); + } + + @Override + public String toString() { + if (txnToWriteIds == null || txnToWriteIds.isEmpty()) { + return "{}"; + } + return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getWriteId() + + "] on connection = " + conn + "; " + "status=" + state; + } + + @Override + public List getCreatedPartitions() { + return createdPartitions; + } +} diff --git a/streaming/src/java/org/apache/hive/streaming/transaction/package-info.java b/streaming/src/java/org/apache/hive/streaming/transaction/package-info.java new file mode 100644 index 0000000000..3c830120ef --- /dev/null +++ b/streaming/src/java/org/apache/hive/streaming/transaction/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package grouping transaction classes. + */ +package org.apache.hive.streaming.transaction; diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java index 8b5e508d94..f70bf98a05 100644 --- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java +++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java @@ -66,7 +66,6 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.api.TxnInfo; -import org.apache.hadoop.hive.metastore.api.TxnState; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; @@ -82,6 +81,7 @@ import org.apache.hadoop.hive.ql.io.orc.OrcStruct; import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.ql.io.orc.RecordReader; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.txn.compactor.Worker; @@ -96,6 +96,10 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; +import org.apache.hive.streaming.transaction.RandomStatementIdChooser; +import org.apache.hive.streaming.transaction.StatementIdChooser; +import org.apache.hive.streaming.transaction.TransactionError; +import org.apache.hive.streaming.transaction.TxnState; import org.apache.orc.impl.OrcAcidUtils; import org.apache.orc.tools.FileDump; import org.apache.thrift.TException; @@ -150,7 +154,8 @@ public FileStatus getFileStatus(Path path) throws IOException { if (file.canExecute()) { mod |= 0111; } - return new FileStatus(file.length(), file.isDirectory(), 1, 1024, + return new FileStatus(file.length(), file.isDirectory(), + 1, 1024, file.lastModified(), file.lastModified(), FsPermission.createImmutable(mod), "owen", "users", path); } @@ -418,6 +423,109 @@ public void testNoBuckets() throws Exception { Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000005/bucket_00000")); } + @Test + public void testConnectionWithWriteId() throws Exception { + queryTable(driver, "drop table if exists default.writeidconnection"); + queryTable(driver, "create table default.writeidconnection (a string, b string) stored as orc " + + "TBLPROPERTIES('transactional'='true')"); + queryTable(driver, "insert into default.writeidconnection values('a0','bar')"); + + List rs = queryTable(driver, "select * from default.writeidconnection"); + Assert.assertEquals(1, rs.size()); + Assert.assertEquals("a0\tbar", rs.get(0)); + + StrictDelimitedInputWriter writerT = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection transactionConnection = HiveStreamingConnection.newBuilder() + .withDatabase("Default") + .withTable("writeidconnection") + .withRecordWriter(writerT) + .withHiveConf(conf) + .connect(); + transactionConnection.beginTransaction(); + + Table tObject = transactionConnection.getTable(); + Long writeId = transactionConnection.getCurrentWriteId(); + + Assert.assertNotNull(tObject); + Assert.assertNotNull(writeId); + + StatementIdChooser sicOne = new RandomStatementIdChooser(); + StrictDelimitedInputWriter writerOne = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .withStatementIdChooser(sicOne) + .build(); + HiveStreamingConnection connectionOne = HiveStreamingConnection.newBuilder() + .withDatabase("Default") + .withTable("writeidconnection") + .withRecordWriter(writerOne) + .withHiveConf(conf) + .withWriteId(writeId) + .withTableObject(tObject) + .connect(); + + StatementIdChooser sicTwo = new RandomStatementIdChooser(); + StrictDelimitedInputWriter writerTwo = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .withStatementIdChooser(sicTwo) + .build(); + HiveStreamingConnection connectionTwo = HiveStreamingConnection.newBuilder() + .withDatabase("Default") + .withTable("writeidconnection") + .withRecordWriter(writerTwo) + .withHiveConf(conf) + .withWriteId(writeId) + .withTableObject(tObject) + .connect(); + + Assert.assertNotNull(connectionOne); + Assert.assertNotNull(connectionTwo); + + connectionOne.beginTransaction(); + connectionTwo.beginTransaction(); + connectionOne.write("a1,b2".getBytes()); + connectionTwo.write("a5,b6".getBytes()); + connectionOne.write("a3,b4".getBytes()); + connectionOne.commitTransaction(); + connectionTwo.commitTransaction(); + + Assert.assertEquals(TxnState.PREPARED_FOR_COMMIT, + connectionOne.getCurrentTransactionState()); + Assert.assertEquals(TxnState.PREPARED_FOR_COMMIT, + connectionTwo.getCurrentTransactionState()); + + try { + connectionOne.beginTransaction(); + Assert.fail("second beginTransaction should have thrown a " + + "StreamingException"); + } catch (StreamingException e) { + + } + + connectionOne.close(); + connectionTwo.close(); + + rs = queryTable(driver, "select ROW__ID, a, b, " + + "INPUT__FILE__NAME from default.writeidconnection order by ROW__ID"); + // Nothing here since it hasn't been committed + Assert.assertEquals(1, rs.size()); + + transactionConnection.commitTransaction(); + + rs = queryTable(driver, "select ROW__ID, a, b, " + + "INPUT__FILE__NAME from default.writeidconnection order by a"); + Assert.assertEquals(4, rs.size()); + Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\ta0\tbar")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("bucket_00000")); + Assert.assertTrue(rs.get(1), rs.get(1).contains("\"rowid\":0}\ta1\tb2")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("bucket_00000")); + Assert.assertTrue(rs.get(2), rs.get(2).contains("\"rowid\":1}\ta3\tb4")); + Assert.assertTrue(rs.get(2), rs.get(2).endsWith("bucket_00000")); + Assert.assertTrue(rs.get(3), rs.get(3).contains("\ta5\tb6")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("bucket_00000")); + } + @Test public void testAllTypesDelimitedWriter() throws Exception { queryTable(driver, "drop table if exists default.alltypes"); @@ -977,8 +1085,8 @@ public void testTransactionBatchEmptyCommit() throws Exception { .connect(); connection.beginTransaction(); connection.commitTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.COMMITTED, + connection.getCurrentTransactionState()); connection.close(); // 2) To unpartitioned table @@ -995,8 +1103,8 @@ public void testTransactionBatchEmptyCommit() throws Exception { connection.beginTransaction(); connection.commitTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.COMMITTED, + connection.getCurrentTransactionState()); connection.close(); } @@ -1115,8 +1223,8 @@ public void testTransactionBatchEmptyAbort() throws Exception { connection.beginTransaction(); connection.abortTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.ABORTED, + connection.getCurrentTransactionState()); connection.close(); // 2) to unpartitioned table @@ -1133,8 +1241,8 @@ public void testTransactionBatchEmptyAbort() throws Exception { connection.beginTransaction(); connection.abortTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.ABORTED, + connection.getCurrentTransactionState()); connection.close(); } @@ -1156,20 +1264,20 @@ public void testTransactionBatchCommitDelimited() throws Exception { // 1st Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write("1,Hello streaming".getBytes()); connection.commitTransaction(); checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.COMMITTED, + connection.getCurrentTransactionState()); // 2nd Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write("2,Welcome to streaming".getBytes()); // data should not be visible @@ -1182,8 +1290,8 @@ public void testTransactionBatchCommitDelimited() throws Exception { connection.close(); - Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.INACTIVE, + connection.getCurrentTransactionState()); // To Unpartitioned table @@ -1199,13 +1307,13 @@ public void testTransactionBatchCommitDelimited() throws Exception { .connect(); // 1st Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write("1,Hello streaming".getBytes()); connection.commitTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.COMMITTED, + connection.getCurrentTransactionState()); connection.close(); } @@ -1227,20 +1335,20 @@ public void testTransactionBatchCommitRegex() throws Exception { // 1st Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write("1,Hello streaming".getBytes()); connection.commitTransaction(); checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.COMMITTED, + connection.getCurrentTransactionState()); // 2nd Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write("2,Welcome to streaming".getBytes()); // data should not be visible @@ -1252,8 +1360,8 @@ public void testTransactionBatchCommitRegex() throws Exception { "{2, Welcome to streaming}"); connection.close(); - Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.INACTIVE, + connection.getCurrentTransactionState()); // To Unpartitioned table regex = "([^:]*):(.*)"; @@ -1271,13 +1379,13 @@ public void testTransactionBatchCommitRegex() throws Exception { // 1st Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write("1:Hello streaming".getBytes()); connection.commitTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.COMMITTED, + connection.getCurrentTransactionState()); connection.close(); } @@ -1328,20 +1436,20 @@ public void testTransactionBatchCommitJson() throws Exception { // 1st Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.OPEN, + connection.getCurrentTransactionState()); String rec1 = "{\"id\" : 1, \"msg\": \"Hello streaming\"}"; connection.write(rec1.getBytes()); connection.commitTransaction(); checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.COMMITTED, + connection.getCurrentTransactionState()); connection.close(); - Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.INACTIVE, + connection.getCurrentTransactionState()); List rs = queryTable(driver, "select * from " + dbName + "." + tblName); Assert.assertEquals(1, rs.size()); @@ -1363,7 +1471,7 @@ public void testJsonInputStream() throws Exception { // 1st Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.OPEN, connection.getCurrentTransactionState()); String records = "{\"id\" : 1, \"msg\": \"Hello streaming\"}|{\"id\" : 2, \"msg\": \"Hello world\"}|{\"id\" : 3, " + "\"msg\": \"Hello world!!\"}"; ByteArrayInputStream bais = new ByteArrayInputStream(records.getBytes()); @@ -1399,20 +1507,20 @@ public void testRemainingTransactions() throws Exception { connection.beginTransaction(); Assert.assertEquals(--initialCount, connection.remainingTransactions()); for (int rec = 0; rec < 2; ++rec) { - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write((batch * rec + ",Hello streaming").getBytes()); } connection.commitTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.COMMITTED, + connection.getCurrentTransactionState()); ++batch; } Assert.assertEquals(0, connection.remainingTransactions()); connection.close(); - Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.INACTIVE, + connection.getCurrentTransactionState()); connection = HiveStreamingConnection.newBuilder() .withDatabase(dbName) @@ -1430,20 +1538,20 @@ public void testRemainingTransactions() throws Exception { connection.beginTransaction(); Assert.assertEquals(--initialCount, connection.remainingTransactions()); for (int rec = 0; rec < 2; ++rec) { - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write((batch * rec + ",Hello streaming").getBytes()); } connection.abortTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.ABORTED, + connection.getCurrentTransactionState()); ++batch; } Assert.assertEquals(0, connection.remainingTransactions()); connection.close(); - Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.INACTIVE, + connection.getCurrentTransactionState()); } @Test @@ -1468,8 +1576,8 @@ public void testTransactionBatchAbort() throws Exception { checkNothingWritten(partLoc); - Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.ABORTED, + connection.getCurrentTransactionState()); connection.close(); @@ -1507,8 +1615,8 @@ public void testTransactionBatchAbortAndCommit() throws Exception { checkNothingWritten(partLoc); - Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.ABORTED, + connection.getCurrentTransactionState()); connection.beginTransaction(); connection.write("1,Hello streaming".getBytes()); @@ -1577,8 +1685,8 @@ public void testMultipleTransactionBatchCommits() throws Exception { "2\tWelcome to streaming", "3\tHello streaming - once again", "4\tWelcome to streaming - once again"); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.COMMITTED, + connection.getCurrentTransactionState()); connection.close(); } @@ -1690,10 +1798,10 @@ public void testInterleavedTransactionBatchCommits() throws Exception { "3\tHello streaming - once again", "4\tWelcome to streaming - once again"); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection2.getCurrentTransactionState()); + Assert.assertEquals(TxnState.COMMITTED, + connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.COMMITTED, + connection2.getCurrentTransactionState()); connection.close(); connection2.close(); @@ -2511,7 +2619,8 @@ private void recordOffsets(final HiveConf conf, final String dbLocation, } @Test - public void testErrorHandling() throws Exception { + public void testErrorHandling() + throws Exception { String agentInfo = "UT_" + Thread.currentThread().getName(); runCmdOnDriver("create database testErrors"); runCmdOnDriver("use testErrors"); @@ -2538,8 +2647,12 @@ public void testErrorHandling() throws Exception { GetOpenTxnsInfoResponse r = msClient.showTxns(); Assert.assertEquals("HWM didn'table match", 17, r.getTxn_high_water_mark()); List ti = r.getOpen_txns(); - Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState()); - Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState()); + Assert.assertEquals("wrong status ti(0)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(0).getState()); + Assert.assertEquals("wrong status ti(1)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(1).getState()); try { @@ -2621,10 +2734,16 @@ public void testErrorHandling() throws Exception { r = msClient.showTxns(); Assert.assertEquals("HWM didn't match", 19, r.getTxn_high_water_mark()); ti = r.getOpen_txns(); - Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState()); - Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState()); + Assert.assertEquals("wrong status ti(0)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(0).getState()); + Assert.assertEquals("wrong status ti(1)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(1).getState()); //txnid 3 was committed and thus not open - Assert.assertEquals("wrong status ti(2)", TxnState.ABORTED, ti.get(2).getState()); + Assert.assertEquals("wrong status ti(2)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(2).getState()); connection.close(); writer.disableErrors(); @@ -2651,8 +2770,12 @@ public void testErrorHandling() throws Exception { r = msClient.showTxns(); Assert.assertEquals("HWM didn'table match", 21, r.getTxn_high_water_mark()); ti = r.getOpen_txns(); - Assert.assertEquals("wrong status ti(3)", TxnState.ABORTED, ti.get(3).getState()); - Assert.assertEquals("wrong status ti(4)", TxnState.ABORTED, ti.get(4).getState()); + Assert.assertEquals("wrong status ti(3)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(3).getState()); + Assert.assertEquals("wrong status ti(4)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(4).getState()); } // assumes un partitioned table @@ -2954,4 +3077,5 @@ void disableErrors() { shouldThrow = false; } } + } diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java b/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java index 32a6d06c2b..d9b78ffea8 100644 --- a/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java +++ b/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hive.streaming.transaction.TxnState; import org.apache.thrift.TException; import org.junit.After; import org.junit.Assert; @@ -631,7 +632,7 @@ public void testJsonInputStreamDP() throws Exception { // 1st Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, connection.getCurrentTransactionState()); + Assert.assertEquals(TxnState.OPEN, connection.getCurrentTransactionState()); String records = "{\"id\" : 1, \"msg\": \"Hello streaming\", \"continent\": \"Asia\", \"Country\": \"India\"}|" + "{\"id\" : 2, \"msg\": \"Hello world\", \"continent\": \"Europe\", \"Country\": \"Germany\"}|" + "{\"id\" : 3, \"msg\": \"Hello world!!\", \"continent\": \"Asia\", \"Country\": \"China\"}|" +