tableNames, Throwable t);
+
+}
diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/doc-files/system-overview.png hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/doc-files/system-overview.png
new file mode 100644
index 0000000..0055961
Binary files /dev/null and hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/doc-files/system-overview.png differ
diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
new file mode 100644
index 0000000..72daee8
--- /dev/null
+++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
@@ -0,0 +1,474 @@
+
+
+
+
+
+
+
+
+HCatalog Streaming Mutation API
+
+
+
+
+HCatalog Streaming Mutation API -- high level description
+
+Background
+
+In certain data processing use cases it is necessary to modify existing
+data when new facts arrive. An example of this is the classic ETL merge
+where a copy of a data set is kept in sync with a master by the frequent
+application of deltas. The deltas describe the mutations (inserts,
+updates, deletes) that have occurred to the master since the previous
+sync. To implement such a case using Hadoop traditionally demands that
+the partitions containing records targeted by the mutations be
+rewritten. This is a coarse approach; a partition containing millions of
+records might be rebuilt because of a single record change. Additionally
+these partitions cannot be restated atomically; at some point the old
+partition data must be swapped with the new partition data. When this
+swap occurs, usually by issuing an HDFS
+rm
+followed by a
+mv
+, the possibility exists where the data appears to be unavailable and
+hence any downstream jobs consuming the data might unexpectedly fail.
+Therefore data processing patterns that restate raw data on HDFS cannot
+operate robustly without some external mechanism to orchestrate
+concurrent access to changing data.
+
+
+
+The availability of ACID tables in Hive provides a mechanism that both
+enables concurrent access to data stored in HDFS (so long as it's in the
+ORC+ACID format), and also permits row level mutations or records within
+a table, without the need to rewrite the existing data. But while Hive
+itself supports
+INSERT
+,
+UPDATE
+and
+DELETE
+commands, and the ORC format can support large batches of mutations in a
+transaction, Hive's execution engine currently submits each individual
+mutation operation in a separate transaction and issues table scans (M/R
+jobs) to execute them. It does not currently scale to the demands of
+processing large deltas in an atomic manner. Furthermore it would be
+advantageous to extend atomic batch mutation capabilities beyond Hive by
+making them available to other data processing frameworks. The Streaming
+Mutation API does just this.
+
+
+The Streaming Mutation API, although similar to the Streaming
+API, has a number of differences and are built to enable very different
+use cases. Superficially, the Streaming API can only write new data
+whereas the mutation API can also modify existing data. However the two
+APIs also based on very different transaction models. The Streaming API
+focuses on surfacing a continuous stream of new data into a Hive table
+and does so by batching small sets of writes into multiple short-lived
+transactions. Conversely the mutation API is designed to infrequently
+apply large sets of mutations to a data set in an atomic fashion; all
+mutations will either be applied or they will not. This instead mandates
+the use of a single long-lived transaction. This table summarises the
+attributes of each API:
+
+
+
+
+| Attribute |
+Streaming API |
+Mutation API |
+
+
+| Ingest type |
+Data arrives continuously |
+Ingests are performed periodically and the mutations are
+applied in a single batch |
+
+
+| Transaction scope |
+Transactions are created for small batches of writes |
+The entire set of mutations should be applied within a single
+transaction |
+
+
+| Data availability |
+Surfaces new data to users frequently and quickly |
+Change sets should be applied atomically, either the effect of
+the delta is visible or it is not |
+
+
+| Sensitive to record order |
+No, records do not have pre-existing lastTxnIds or bucketIds.
+Records are likely being written into a single partition (today's date
+for example) |
+Yes, all mutated records have existing RecordIdentifiers
+and must be grouped by (partitionValues, bucketId) and sorted by
+lastTxnId. These record coordinates initially arrive in an order that is
+effectively random.
+ |
+
+
+| Impact of a write failure |
+Transaction can be aborted and producer can choose to resubmit
+failed records as ordering is not important. |
+Ingest for the respective must be halted and failed records
+resubmitted to preserve sequence. |
+
+
+| User perception of missing data |
+Data has not arrived yet → "latency?" |
+"This data is inconsistent, some records have been updated, but
+other related records have not" - consider here the classic transfer
+between bank accounts scenario |
+
+
+| API end point scope |
+A given HiveEndPoint instance submits many
+transactions to a specific bucket, in a specific partition, of a
+specific table
+ |
+A set ofMutationCoordinators write changes to
+unknown set of buckets, of an unknown set of partitions, of specific
+tables (can be more than one), within a single transaction.
+ |
+
+
+
+
+Structure
+The API comprises two main concerns: transaction management, and
+the writing of mutation operations to the data set. The two concerns
+have a minimal coupling as it is expected that transactions will be
+initiated from a single job launcher type processes while the writing of
+mutations will be scaled out across any number of worker nodes. In the
+context of Hadoop M/R these can be more concretely defined as the Tool
+and Map/Reduce task components. However, use of this architecture is not
+mandated and in fact both concerns could be handled within a single
+simple process depending on the requirements.
+
+Note that a suitably configured Hive instance is required to
+operate this system even if you do not intend to access the data from
+within Hive. Internally, transactions are managed by the Hive MetaStore.
+Mutations are performed to HDFS via ORC APIs that bypass the MetaStore.
+Additionally you may wish to configure your MetaStore instance to
+perform periodic data compactions.
+
+
+Note on packaging: The APIs are defined in the org.apache.hive.hcatalog.streaming.mutate
+Java package and included as the hive-hcatalog-streaming jar.
+
+
+Data requirements
+
+Generally speaking, to apply a mutation to a record one must have some
+unique key that identifies the record. However, primary keys are not a
+construct provided by Hive. Internally Hive uses
+RecordIdentifiers
+stored in a virtual
+ROW__ID
+column to uniquely identified records within an ACID table. Therefore,
+any process that wishes to issue mutations to a table via this API must
+have available the corresponding row ids for the target records. What
+this means in practice is that the process issuing mutations must first
+read in a current snapshot the data and then join the mutations on some
+domain specific primary key to obtain the corresponding Hive
+ROW__ID
+. This is effectively what occurs within Hive's table scan process when
+an
+UPDATE
+or
+DELETE
+statement is executed. The
+AcidInputFormat
+provides access to this data via
+AcidRecordReader.getRecordIdentifier()
+.
+
+
+The implementation of the ACID format places some constraints on
+the order in which records are written and it is important that this
+ordering is enforced. Additionally, by grouping data appropriately it's
+possible parallelise the writing of mutations for the purposes of
+scaling. Finally, to correctly bucket new records (inserts) there is a
+slightly unintuitive trick that must be applied.
+
+All of these data sequencing concerns are the responsibility of
+the client process calling the API which is assumed to have first class
+grouping and sorting capabilities (Hadoop Map/Reduce etc.) The streaming
+API provides nothing more than a validator that fails fast when it
+encounters records that are out of sequence.
+
+In short, API client processes should prepare data for the mutate
+API like so:
+
+- MUST: Order records by
ROW__ID.originalTxn,
+then ROW__ID.rowId.
+- MUST: Assign a
ROW__ID containing a
+computed bucketId to records to be inserted.
+- SHOULD: Group/partition by table partition value, then
ROW__ID.bucketId.
+
+
+
+The addition of a bucket ids to insert records prior to grouping and
+sorting seems unintuitive. However, it is required both to ensure
+adequate partitioning of new data and bucket allocation consistent with
+that provided by Hive. In a typical ETL the majority of mutation events
+are inserts, often targeting a single partition (new data for the
+previous day, hour, etc.) If more that one worker is writing said
+events, were we to leave the bucket id empty then all inserts would go
+to a single worker (e.g: reducer) and the workload could be heavily
+skewed. The assignment of a computed bucket allows inserts to be more
+usefully distributed across workers. Additionally, when Hive is working
+with the data it may expect records to have been bucketed in a way that
+is consistent with it's own internal scheme. A convenience type and
+method is provided to more easily compute and append bucket ids:
+BucketIdAppender
+and
+BucketIdAppenderImpl
+.
+
+
+Update operations should not attempt to modify values of
+partition or bucketing columns. The API does not prevent this and such
+attempts could lead to data corruption.
+
+Streaming requirements
+A few things are currently required to use streaming.
+
+
+
+- Currently, only ORC storage format is supported. So 'stored
+as orc' must be specified during table creation.
+
+- The hive table must be bucketed, but not sorted. So something
+like 'clustered by (colName) into 10 buckets
+' must be specified during table creation.
+
+- User of the client streaming process must have the necessary
+permissions to write to the table or partition and create partitions in
+the table.
+- Settings required in hive-site.xml for Metastore:
+
+- hive.txn.manager =
+org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
+- hive.support.concurrency = true
+- hive.compactor.initiator.on = true
+- hive.compactor.worker.threads > 0
+
+
+
+
+
+
+Note: Streaming mutations to unpartitioned tables is also
+supported.
+
+
+Record layout
+
+The structure, layout, and encoding of records is the exclusive concern
+of the client ETL mutation process and may be quite different from the
+target Hive ACID table. The mutation API requires concrete
+implementations of the
+MutatorFactory
+and
+Mutator
+classes to extract pertinent data from records and serialize data into
+the ACID files. Fortunately base classes are provided (
+AbstractMutator
+,
+RecordInspectorImpl
+) to simplify this effort and usually all that is required is the
+specification of a suitable
+ObjectInspector
+and the provision of the indexes of the
+ROW__ID
+and bucketed columns within the record structure. Note that all column
+indexes in these classes are with respect to your record structure, not
+the Hive table structure.
+
+
+You will likely also want to use a
+BucketIdAppender
+to append bucket ids to new records for insertion. Fortunately the core
+implementation is provided in
+BucketIdAppenderImpl
+but note that bucket column indexes must be presented in the same order
+as they are in the Hive table definition to ensure consistent bucketing.
+
+
+Connection and Transaction management
+
+The
+MutatorClient
+class is used to create and manage transactions in which mutations can
+be performed. The scope of a transaction can extend across multiple ACID
+tables. When a client connects it communicates with the meta store to
+verify and acquire meta data for the target tables. An invocation of
+newTransaction
+then opens a transaction with the meta store, finalises a collection of
+MutationDestinations
+and returns a new
+Transaction
+instance. The mutation destinations are light-weight, serializable, and
+immutable objects that are used by the mutation writing components of
+the API to target specific ACID file locations.
+
+
+As you would expect, a
+Transaction
+must be initiated with a call to
+begin
+before any mutations can be applied. This invocation acquires a lock on
+the targeted tables using the meta store, and initiates a heartbeat to
+prevent transaction timeouts. It is highly recommended that you register
+a
+LockFailureListener
+with the client so that your process can handle any lock or transaction
+failures. Typically you may wish to abort the job in the event of such
+an error. With the transaction in place you can now start streaming
+mutations with one or more
+MutatorCoordinator
+instances (more on this later), can can finally
+commit
+or
+abort
+the transaction when the change set has been applied, which will release
+the lock with the meta store client. Finally you should
+close
+the mutation client to release any held resources.
+
+
+The
+MutatorClientBuilder
+is provided to simplify the construction of clients.
+
+
+
+WARNING: Hive doesn't currently have a deadlock detector (it is
+being worked on as part of HIVE-9675).
+This API could potentially deadlock with other stream writers or with
+SQL users.
+
+Writing data
+
+
+The
+MutatorCoordinator
+class is used to issue mutations to an ACID table. You will require at
+least one instance per table participating in the transaction. The
+target of a given instance is defined by the respective
+MutatorDestination
+used to construct the coordinator. It is recommended that a
+MutatorClientBuilder
+is used to simplify the construction process.
+
+
+
+Mutations can be applied by invoking the respective
+insert
+,
+update
+, and
+delete
+methods on the coordinator. These methods each take as parameters the
+target partition of the record and the mutated record. In the case of an
+unpartitioned table you should simply pass an empty list as the
+partition value. For inserts specifically, only the bucket id will be
+extracted from the
+RecordIdentifier
+, the transactionId and rowId will be ignored and replaced by
+appropriate values in the
+RecordUpdater
+. Additionally, in the case of deletes, everything but the
+RecordIdentifier
+in the record will be ignored and therefore it is often easier to simply
+submit the original record.
+
+
+
+Caution: As mentioned previously, mutations must arrive in
+specific order for the resultant table data to be consistent.
+Coordinators will verify a naturally ordered sequence of
+(lastTransactionId, rowId) and will throw an exception if this sequence
+is broken. This exception should almost certainly be escalated so that
+the transaction is aborted. This, along with the correct ordering of the
+data, is the responsibility of the client using the API.
+
+
+Dynamic Partition Creation:
+It is very likely to be desirable to have new partitions created
+automatically (say on a hourly basis). In such cases requiring the Hive
+admin to pre-create the necessary partitions may not be reasonable.
+Consequently the API allows coordinators to create partitions as needed
+(see:
+MutatorClientBuilder.addTable(String, String, boolean)
+). Partition creation being an atomic action, multiple coordinators can
+race to create the partition, but only one would succeed, so
+coordinators clients need not synchronize when creating a partition. The
+user of the coordinator process needs to be given write permissions on
+the Hive table in order to create partitions.
+
+Reading data
+
+
+Although this API is concerned with writing changes to data, as
+previously stated we'll almost certainly have to read the existing data
+first to obtain the relevant
+ROW_IDs
+. Therefore it is worth noting that reading ACID data in a robust and
+consistent manner requires the following:
+
+- Obtaining a valid transaction list from the meta store (
ValidTxnList).
+
+- Acquiring a read-lock with the meta store and issuing
+heartbeats (
LockImpl can help with this).
+
+- Configuring the
OrcInputFormat and then reading
+the data. Make sure that you also pull in the ROW__ID
+values. See: AcidRecordReader.getRecordIdentifier.
+
+- Releasing the read-lock.
+
+
+
+Example
+
+So to recap, the sequence of events required to apply mutations
+to a dataset using the API is:
+
+- Create a
MutatorClient to manage a transaction for
+the targeted ACID tables. Don't forget to register a LockFailureListener
+so that you can handle transaction failures.
+
+- Open a new
Transaction with the client.
+
+- Get the
MutatorDestinations from the client.
+
+- Being the transaction.
+- Create at least one
MutatorCoordinator for each
+destination.
+
+- Compute your mutation set (this is your ETL merge process).
+- Append bucket ids to insertion records. A
BucketIdAppender
+can help here.
+
+- Group and sort your data appropriately.
+- Issue mutation events to your coordinators.
+- Close your coordinators.
+- Abort or commit the transaction.
+- Close your mutation client.
+
+
+See
+ExampleUseCase
+and
+TestMutations.testUpdatesAndDeletes()
+for some very simple usages.
+
+
+
+
+
diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/AbstractMutator.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/AbstractMutator.java
new file mode 100644
index 0000000..2fedc8f
--- /dev/null
+++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/AbstractMutator.java
@@ -0,0 +1,77 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+/** Base {@link Mutator} implementation. Creates a suitable {@link RecordUpdater} and delegates mutation events. */
+public abstract class AbstractMutator implements Mutator {
+
+ private final long transactionId;
+ private final Path partitionPath;
+ private final int bucketId;
+ private final Configuration configuration;
+ private RecordUpdater updater;
+
+ protected AbstractMutator(Configuration configuration, AcidOutputFormat, ?> outputFormat, long transactionId,
+ Path partitionPath, int bucketId) throws IOException {
+ this.configuration = configuration;
+ this.transactionId = transactionId;
+ this.partitionPath = partitionPath;
+ this.bucketId = bucketId;
+ updater = createRecordUpdater(outputFormat);
+ }
+
+ @Override
+ public void insert(Object record) throws IOException {
+ updater.insert(transactionId, record);
+ }
+
+ @Override
+ public void update(Object record) throws IOException {
+ updater.update(transactionId, record);
+ }
+
+ @Override
+ public void delete(Object record) throws IOException {
+ updater.delete(transactionId, record);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ updater.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ updater.close(false);
+ updater = null;
+ }
+
+ @Override
+ public String toString() {
+ return "AbstractMutator [transactionId=" + transactionId + ", partitionPath=" + partitionPath + ", bucketId="
+ + bucketId + "]";
+ }
+
+ protected RecordUpdater createRecordUpdater(AcidOutputFormat, ?> outputFormat) throws IOException {
+ return outputFormat.getRecordUpdater(partitionPath,
+ new AcidOutputFormat.Options(configuration)
+ .inspector(getObjectInspector())
+ .bucket(bucketId)
+ .minimumTransactionId(transactionId)
+ .maximumTransactionId(transactionId)
+ .recordIdColumn(getRecordIdColumn()));
+ }
+
+ /** Return the {@link ObjectInspector} that can be used to understand the records provided in mutation events. */
+ protected abstract ObjectInspector getObjectInspector();
+
+ /** Return the index of the {@link RecordInspector} within the records provided in mutation events. */
+ protected abstract int getRecordIdColumn();
+
+}
diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdAppender.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdAppender.java
new file mode 100644
index 0000000..22cc2eb
--- /dev/null
+++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdAppender.java
@@ -0,0 +1,8 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+/** Computes and appends bucket ids to records that are due to be inserted. */
+public interface BucketIdAppender {
+
+ Object attachBucketIdToRecord(Object record);
+
+}
diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdAppenderImpl.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdAppenderImpl.java
new file mode 100644
index 0000000..7d05509
--- /dev/null
+++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdAppenderImpl.java
@@ -0,0 +1,75 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+
+/**
+ * Implementation of a {@link BucketIdAppender} that includes the logic required to calculate a bucket id from a record
+ * that is consistent with Hive's own internal computation scheme.
+ */
+public class BucketIdAppenderImpl implements BucketIdAppender {
+
+ private static final long INVALID_TRANSACTION_ID = -1L;
+ private static final long INVALID_ROW_ID = -1L;
+
+ private final SettableStructObjectInspector structObjectInspector;
+ private final StructField[] bucketFields;
+ private final int totalBuckets;
+ private final StructField recordIdentifierField;
+
+ /**
+ * Note that all column indexes are with respect to your record structure, not the Hive table structure. Bucket column
+ * indexes bust be presented in the same order as they are in the Hive table definition.
+ */
+ public BucketIdAppenderImpl(ObjectInspector objectInspector, int recordIdColumn, int totalBuckets, int[] bucketColumns) {
+ this.totalBuckets = totalBuckets;
+ if (!(objectInspector instanceof SettableStructObjectInspector)) {
+ throw new IllegalArgumentException("Serious problem, expected a StructObjectInspector, " + "but got a "
+ + objectInspector.getClass().getName());
+ }
+
+ if (bucketColumns.length < 1) {
+ throw new IllegalArgumentException("No bucket column indexes set.");
+ }
+ structObjectInspector = (SettableStructObjectInspector) objectInspector;
+ List extends StructField> structFields = structObjectInspector.getAllStructFieldRefs();
+
+ recordIdentifierField = structFields.get(recordIdColumn);
+
+ bucketFields = new StructField[bucketColumns.length];
+ for (int i = 0; i < bucketColumns.length; i++) {
+ int bucketColumnsIndex = bucketColumns[i];
+ bucketFields[i] = structFields.get(bucketColumnsIndex);
+ }
+ }
+
+ @Override
+ public Object attachBucketIdToRecord(Object record) {
+ int bucketId = computeBucketId(record);
+ RecordIdentifier recordIdentifier = new RecordIdentifier(INVALID_TRANSACTION_ID, bucketId, INVALID_ROW_ID);
+ structObjectInspector.setStructFieldData(record, recordIdentifierField, recordIdentifier);
+ return record;
+ }
+
+ /** Based on: {@link org.apache.hadoop.hive.ql.exec.ReduceSinkOperator#computeBucketNumber(Object, int)}. */
+ private int computeBucketId(Object record) {
+ int bucketId = 1;
+
+ for (int columnIndex = 0; columnIndex < bucketFields.length; columnIndex++) {
+ Object columnValue = structObjectInspector.getStructFieldData(record, bucketFields[columnIndex]);
+ bucketId = bucketId * 31 + ObjectInspectorUtils.hashCode(columnValue, bucketFields[columnIndex].getFieldObjectInspector());
+ }
+
+ if (bucketId < 0) {
+ bucketId = -1 * bucketId;
+ }
+
+ return bucketId % totalBuckets;
+ }
+
+}
diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java
new file mode 100644
index 0000000..4324d25
--- /dev/null
+++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java
@@ -0,0 +1,83 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utility class that can create new table partitions within the {@link IMetaStoreClient meta store}. */
+class CreatePartitionHelper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CreatePartitionHelper.class);
+
+ private final IMetaStoreClient metaStoreClient;
+ private final String databaseName;
+ private final String tableName;
+
+ CreatePartitionHelper(IMetaStoreClient metaStoreClient, String databaseName, String tableName) {
+ this.metaStoreClient = metaStoreClient;
+ this.databaseName = databaseName;
+ this.tableName = tableName;
+ }
+
+ /** Returns the expected {@link Path} for a given partition value. */
+ Path getPathForPartition(List newPartitionValues) throws MutatorException {
+ try {
+ String location;
+ if (newPartitionValues.isEmpty()) {
+ location = metaStoreClient.getTable(databaseName, tableName).getSd().getLocation();
+ } else {
+ location = metaStoreClient.getPartition(databaseName, tableName, newPartitionValues).getSd().getLocation();
+ }
+ LOG.debug("Found path {} for partition {}", location, newPartitionValues);
+ return new Path(location);
+ } catch (NoSuchObjectException e) {
+ throw new MutatorException("Table not found '" + databaseName + "." + tableName + "'.", e);
+ } catch (TException e) {
+ throw new MutatorException("Failed to get path for partitions '" + newPartitionValues + "' on table '"
+ + databaseName + "." + tableName + "' with meta store: " + metaStoreClient, e);
+ }
+ }
+
+ /** Creates the specified partition if it does not already exist. Does nothing if the table is unpartitioned. */
+ void createPartitionIfNotExists(List newPartitionValues) throws MutatorException {
+ if (newPartitionValues.isEmpty()) {
+ return;
+ }
+
+ try {
+ LOG.debug("Attempting to create partition (if not exists) {}.{}:{}", databaseName, tableName, newPartitionValues);
+ Table table = metaStoreClient.getTable(databaseName, tableName);
+
+ Partition partition = new Partition();
+ partition.setDbName(table.getDbName());
+ partition.setTableName(table.getTableName());
+ StorageDescriptor partitionSd = new StorageDescriptor(table.getSd());
+ partitionSd.setLocation(table.getSd().getLocation() + Path.SEPARATOR
+ + Warehouse.makePartName(table.getPartitionKeys(), newPartitionValues));
+ partition.setSd(partitionSd);
+ partition.setValues(newPartitionValues);
+
+ metaStoreClient.add_partition(partition);
+ } catch (AlreadyExistsException e) {
+ LOG.debug("Partition already exisits: {}.{}:{}", databaseName, tableName, newPartitionValues);
+ } catch (NoSuchObjectException e) {
+ LOG.error("Failed to create partition : " + newPartitionValues, e);
+ throw new MutatorException("Table not found '" + databaseName + "." + tableName + "'.", e);
+ } catch (TException e) {
+ LOG.error("Failed to create partition : " + newPartitionValues, e);
+ throw new MutatorException("Failed to create partition '" + newPartitionValues + "' on table '" + databaseName
+ + "." + tableName + "'", e);
+ }
+ }
+
+}
diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/Mutator.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/Mutator.java
new file mode 100644
index 0000000..2e9afe9
--- /dev/null
+++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/Mutator.java
@@ -0,0 +1,20 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Interface for submitting mutation events to a given partition and bucket in an ACID table. Requires records to arrive
+ * in the order defined by the {@link SequenceValidator}.
+ */
+public interface Mutator extends Closeable {
+
+ void insert(Object record) throws IOException;
+
+ void update(Object record) throws IOException;
+
+ void delete(Object record) throws IOException;
+
+ void flush() throws IOException;
+
+}
diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
new file mode 100644
index 0000000..d9bfbfc
--- /dev/null
+++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
@@ -0,0 +1,213 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hive.hcatalog.streaming.mutate.client.MutatorDestination;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Orchestrates the application of an ordered sequence of mutation events to a given ACID table. Events must be grouped
+ * by partition, then bucket and ordered by origTxnId, then rowId (TODO should this be the other way around?
+ * {@link org.apache.hadoop.hive.ql.io.RecordIdentifier#compareToInternal(RecordIdentifier)
+ * RecordIdentifier.compareToInternal(RecordIdentifier)} suggests that origTxnId takes precedence over rowId.) Ordering
+ * (but not grouping) is enforced by the {@link SequenceValidator}. Events are free to target any bucket and partition,
+ * including new partitions if {@link MutatorDestination#createPartitions()} is set. Internally the coordinator creates
+ * and closes {@link Mutator Mutators} as needed to write to the appropriate partition and bucket.
+ *
+ * When a mutation event has no declared target bucket (this applies to {@link #insert(List, Object) inserts} only) then
+ * the coordinator attempts to use the previous mutator instance (and hence bucket). However, it is not a good idea to
+ * bucket all inserts into the same bucket and so the coordinator will switch bucket to a random value when either a
+ * {@link #maxContiguousBucketInserts threshold} is reached or the target partition changes. Note that
+ * {@link #update(List, Object) updates} and {@link #delete(List, Object) deletes} will always be forwarded to their
+ * declared bucket.
+ *
+ * It is also recommended that {@link #insert(List, Object) insert} events be artificially assigned appropriate bucket
+ * ids in the preceding grouping phase so that they are nicely mixed in with other event types. Note that this is safe
+ * to do as any {@link RecordIdentifier RecordIdentifier} set on an insert record will be ignored by both the
+ * coordinator and the underlying {@link RecordUpdater}.
+ */
+public class MutatorCoordinator implements Closeable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MutatorCoordinator.class);
+
+ private final IMetaStoreClient metaStoreClient;
+ private final MutatorFactory mutatorFactory;
+ private final SequenceValidator sequenceValidator;
+ private final MutatorDestination destination;
+ private final RecordInspector recordInspector;
+ private final CreatePartitionHelper partitionHelper;
+ private final AcidOutputFormat, ?> outputFormat;
+
+ private int bucketId;
+ private List partitionValues;
+ private Path partitionPath;
+ private Mutator mutator;
+
+ MutatorCoordinator(IMetaStoreClient metaStoreClient, HiveConf configuration, MutatorFactory mutatorFactory,
+ MutatorDestination destination) throws MutatorException {
+ this(metaStoreClient, configuration, mutatorFactory, new CreatePartitionHelper(metaStoreClient,
+ destination.getDatabaseName(), destination.getTableName()), new SequenceValidator(), destination);
+ }
+
+ // Visible for testing only
+ MutatorCoordinator(IMetaStoreClient metaStoreClient, HiveConf configuration, MutatorFactory mutatorFactory,
+ CreatePartitionHelper partitionHelper, SequenceValidator sequenceValidator, MutatorDestination destination)
+ throws MutatorException {
+ this.metaStoreClient = metaStoreClient;
+ this.mutatorFactory = mutatorFactory;
+ this.partitionHelper = partitionHelper;
+ this.sequenceValidator = sequenceValidator;
+ this.destination = destination;
+ this.recordInspector = this.mutatorFactory.newRecordInspector();
+
+ bucketId = -1;
+ outputFormat = createOutputFormat(destination.getOutputFormatName(), configuration);
+ }
+
+ /**
+ * We expect records grouped by (partitionValues,bucketId) and ordered by (origTxnId,rowId).
+ *
+ * @throws MutatorException
+ */
+ public void insert(List partitionValues, Object record) throws MutatorException {
+ reconfigureState(OperationType.INSERT, partitionValues, record);
+ try {
+ mutator.insert(record);
+ LOG.debug("Inserted into partition={}, record={}", partitionValues, record);
+ } catch (IOException e) {
+ throw new MutatorException("Failed to insert record '" + record + " using mutator '" + mutator + "'.", e);
+ }
+ }
+
+ /**
+ * We expect records grouped by (partitionValues,bucketId) and ordered by (origTxnId,rowId).
+ *
+ * @throws MutatorException
+ */
+ public void update(List partitionValues, Object record) throws MutatorException {
+ reconfigureState(OperationType.UPDATE, partitionValues, record);
+ try {
+ mutator.update(record);
+ LOG.debug("Updated in partition={}, record={}", partitionValues, record);
+ } catch (IOException e) {
+ throw new MutatorException("Failed to update record '" + record + " using mutator '" + mutator + "'.", e);
+ }
+ }
+
+ /**
+ * We expect records grouped by (partitionValues,bucketId) and ordered by (origTxnId,rowId).
+ *
+ * @throws MutatorException
+ */
+ public void delete(List partitionValues, Object record) throws MutatorException {
+ reconfigureState(OperationType.DELETE, partitionValues, record);
+ try {
+ mutator.delete(record);
+ LOG.debug("Deleted from partition={}, record={}", partitionValues, record);
+ } catch (IOException e) {
+ throw new MutatorException("Failed to delete record '" + record + " using mutator '" + mutator + "'.", e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (mutator != null) {
+ mutator.close();
+ }
+ } finally {
+ metaStoreClient.close();
+ }
+ }
+
+ private void reconfigureState(OperationType operationType, List newPartitionValues, Object record)
+ throws MutatorException {
+ RecordIdentifier newRecordIdentifier;
+ int newBucketId;
+ newRecordIdentifier = recordInspector.extractRecordIdentifier(record);
+ newBucketId = newRecordIdentifier.getBucketId();
+
+ if (newPartitionValues == null) {
+ newPartitionValues = Collections.emptyList();
+ }
+
+ try {
+ if (partitionHasChanged(newPartitionValues)) {
+ if (destination.createPartitions()) {
+ partitionHelper.createPartitionIfNotExists(newPartitionValues);
+ }
+ Path newPartitionPath = partitionHelper.getPathForPartition(newPartitionValues);
+ resetMutator(newBucketId, newPartitionValues, newPartitionPath);
+ } else if (bucketIdHasChanged(newBucketId)) {
+ resetMutator(newBucketId, partitionValues, partitionPath);
+ } else if (identiferOutOfSequence(operationType, newRecordIdentifier)) {
+ throw new MutatorException("Records not in sequence: state=" + sequenceValidator + ", record="
+ + newRecordIdentifier);
+ }
+ } catch (IOException e) {
+ throw new MutatorException("Failed to reset mutator when performing " + operationType + " of record: " + record,
+ e);
+ }
+ }
+
+ private void resetMutator(int newBucketId, List newPartitionValues, Path newPartitionPath) throws IOException {
+ if (mutator != null) {
+ mutator.close();
+ }
+ sequenceValidator.reset();
+ mutator = mutatorFactory.newMutator(outputFormat, destination.getTransactionId(), newPartitionPath, newBucketId);
+ bucketId = newBucketId;
+ partitionValues = newPartitionValues;
+ partitionPath = newPartitionPath;
+ LOG.debug("Reset mutator: bucketId={},partition={}", bucketId, partitionValues);
+ }
+
+ private boolean partitionHasChanged(List newPartitionValues) {
+ boolean partitionHasChanged = !Objects.equals(this.partitionValues, newPartitionValues);
+ if (partitionHasChanged) {
+ LOG.debug("Partition changed from={}, to={}", this.partitionValues, newPartitionValues);
+ }
+ return partitionHasChanged;
+ }
+
+ private boolean bucketIdHasChanged(int newBucketId) {
+ boolean bucketIdHasChanged = this.bucketId != newBucketId;
+ if (bucketIdHasChanged) {
+ LOG.debug("Bucket ID changed from={}, to={}", this.bucketId, newBucketId);
+ }
+ return bucketIdHasChanged;
+ }
+
+ private boolean identiferOutOfSequence(OperationType operationType, RecordIdentifier newRecordIdentifier) {
+ boolean identiferOutOfSequence = operationType != OperationType.INSERT
+ && !sequenceValidator.isInSequence(newRecordIdentifier);
+ if (identiferOutOfSequence) {
+ LOG.debug("Records not in sequence: state={}, record={}", sequenceValidator, newRecordIdentifier);
+ }
+ return identiferOutOfSequence;
+ }
+
+ @SuppressWarnings("unchecked")
+ private AcidOutputFormat, ?> createOutputFormat(String outputFormatName, HiveConf configuration)
+ throws MutatorException {
+ try {
+ return (AcidOutputFormat, ?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outputFormatName), configuration);
+ } catch (ClassNotFoundException e) {
+ throw new MutatorException("Could not locate class for '" + outputFormatName + "'.", e);
+ }
+ }
+
+}
diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java
new file mode 100644
index 0000000..6ae09ed
--- /dev/null
+++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java
@@ -0,0 +1,66 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.streaming.mutate.HiveConfFactory;
+import org.apache.hive.hcatalog.streaming.mutate.UgiMetaStoreClientFactory;
+import org.apache.hive.hcatalog.streaming.mutate.client.MutatorDestination;
+
+/** Convenience class for building {@link MutatorCoordinator} instances. */
+public class MutatorCoordinatorBuilder {
+
+ private HiveConf configuration;
+ private MutatorFactory mutatorFactory;
+ private UserGroupInformation authenticatedUser;
+ private String metaStoreUri;
+ private MutatorDestination destination;
+
+ public MutatorCoordinatorBuilder configuration(HiveConf configuration) {
+ this.configuration = configuration;
+ return this;
+ }
+
+ public MutatorCoordinatorBuilder authenticatedUser(UserGroupInformation authenticatedUser) {
+ this.authenticatedUser = authenticatedUser;
+ return this;
+ }
+
+ public MutatorCoordinatorBuilder metaStoreUri(String metaStoreUri) {
+ this.metaStoreUri = metaStoreUri;
+ return this;
+ }
+
+ /** Set the destination ACID table for this client. */
+ public MutatorCoordinatorBuilder destination(MutatorDestination destination) {
+ this.destination = destination;
+ return this;
+ }
+
+ public MutatorCoordinatorBuilder mutatorFactory(MutatorFactory mutatorFactory) {
+ this.mutatorFactory = mutatorFactory;
+ return this;
+ }
+
+ public MutatorCoordinator build() throws MutatorException, MetaException {
+ String user = authenticatedUser == null ? System.getProperty("user.name") : authenticatedUser.getShortUserName();
+ boolean secureMode = authenticatedUser == null ? false : authenticatedUser.hasKerberosCredentials();
+
+ configuration = HiveConfFactory.newInstance(configuration, this.getClass(), metaStoreUri);
+
+ IMetaStoreClient metaStoreClient;
+ try {
+ metaStoreClient = new UgiMetaStoreClientFactory(metaStoreUri, configuration, authenticatedUser, user, secureMode)
+ .newInstance(HCatUtil.getHiveMetastoreClient(configuration));
+ } catch (IOException e) {
+ throw new MutatorException("Could not create meta store client.", e);
+ }
+
+ return new MutatorCoordinator(metaStoreClient, configuration, mutatorFactory, destination);
+ }
+
+}
diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorException.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorException.java
new file mode 100644
index 0000000..754c5b5
--- /dev/null
+++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorException.java
@@ -0,0 +1,15 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+public class MutatorException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public MutatorException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public MutatorException(String message) {
+ super(message);
+ }
+
+}
diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java
new file mode 100644
index 0000000..e9e89c8
--- /dev/null
+++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java
@@ -0,0 +1,15 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+
+public interface MutatorFactory {
+
+ Mutator newMutator(AcidOutputFormat, ?> outputFormat, long transactionId, Path partitionPath, int bucketId) throws IOException;
+
+ RecordInspector newRecordInspector();
+
+ BucketIdAppender newBucketIdAppender(int totalBuckets);
+}
diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/OperationType.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/OperationType.java
new file mode 100644
index 0000000..5ecb1bb
--- /dev/null
+++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/OperationType.java
@@ -0,0 +1,7 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+enum OperationType {
+ INSERT,
+ UPDATE,
+ DELETE;
+}
diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspector.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspector.java
new file mode 100644
index 0000000..11ef0dd
--- /dev/null
+++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspector.java
@@ -0,0 +1,11 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+
+/** Provide a means to extract {@link RecordIdentifier} from record objects. */
+public interface RecordInspector {
+
+ /** Get the {@link RecordIdentifier} from the record - to be used for updates and deletes only. */
+ RecordIdentifier extractRecordIdentifier(Object record);
+
+}
diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspectorImpl.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspectorImpl.java
new file mode 100644
index 0000000..18ee458
--- /dev/null
+++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspectorImpl.java
@@ -0,0 +1,45 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+/**
+ * Standard {@link RecordInspector} implementation that uses the supplied {@link ObjectInspector} and
+ * {@link AcidOutputFormat.Options#recordIdColumn(int) record id column} to extract {@link RecordIdentifier
+ * RecordIdentifiers}, and calculate bucket ids from records.
+ */
+public class RecordInspectorImpl implements RecordInspector {
+
+ private final StructObjectInspector structObjectInspector;
+ private final StructField recordIdentifierField;
+
+ /**
+ * Note that all column indexes are with respect to your record structure, not the Hive table structure.
+ */
+ public RecordInspectorImpl(ObjectInspector objectInspector, int recordIdColumn) {
+ if (!(objectInspector instanceof StructObjectInspector)) {
+ throw new IllegalArgumentException("Serious problem, expected a StructObjectInspector, " + "but got a "
+ + objectInspector.getClass().getName());
+ }
+
+ structObjectInspector = (StructObjectInspector) objectInspector;
+ List extends StructField> structFields = structObjectInspector.getAllStructFieldRefs();
+ recordIdentifierField = structFields.get(recordIdColumn);
+ }
+
+ public RecordIdentifier extractRecordIdentifier(Object record) {
+ return (RecordIdentifier) structObjectInspector.getStructFieldData(record, recordIdentifierField);
+ }
+
+ @Override
+ public String toString() {
+ return "RecordInspectorImpl [structObjectInspector=" + structObjectInspector + ", recordIdentifierField="
+ + recordIdentifierField + "]";
+ }
+
+}
diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java
new file mode 100644
index 0000000..ba3686b
--- /dev/null
+++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java
@@ -0,0 +1,54 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Verifies that the sequence of {@link RecordIdentifier RecordIdentifiers} are in a valid order for insertion into an
+ * ACID delta file in a given partition and bucket.
+ */
+class SequenceValidator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SequenceValidator.class);
+
+ private Long lastTxId;
+ private Long lastRowId;
+
+ SequenceValidator() {
+ }
+
+ /**
+ * TODO: should order be tx,row or row,tx? {@link
+ * org.apache.hadoop.hive.ql.io.RecordIdentifier#compareToInternal(RecordIdentifier)
+ * RecordIdentifier.compareToInternal(RecordIdentifier)} suggests that txId takes precedence over rowId.
+ */
+ boolean isInSequence(RecordIdentifier recordIdentifier) {
+ if (lastTxId != null && recordIdentifier.getTransactionId() < lastTxId) {
+ LOG.debug("Non-sequential transaction ID. Expected >{}, recordIdentifier={}", lastTxId, recordIdentifier);
+ return false;
+ } else if (lastTxId != null && recordIdentifier.getTransactionId() == lastTxId && lastRowId != null
+ && recordIdentifier.getRowId() <= lastRowId) {
+ LOG.debug("Non-sequential row ID. Expected >{}, recordIdentifier={}", lastRowId, recordIdentifier);
+ return false;
+ }
+ lastTxId = recordIdentifier.getTransactionId();
+ lastRowId = recordIdentifier.getRowId();
+ return true;
+ }
+
+ /**
+ * Validator must be reset for each new partition and or bucket.
+ */
+ void reset() {
+ lastTxId = null;
+ lastRowId = null;
+ LOG.debug("reset");
+ }
+
+ @Override
+ public String toString() {
+ return "SequenceValidator [lastTxId=" + lastTxId + ", lastRowId=" + lastRowId + "]";
+ }
+
+}
diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ExampleUseCase.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ExampleUseCase.java
new file mode 100644
index 0000000..e5fc08c
--- /dev/null
+++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ExampleUseCase.java
@@ -0,0 +1,82 @@
+package org.apache.hive.hcatalog.streaming.mutate;
+
+import java.util.List;
+
+import org.apache.hive.hcatalog.streaming.mutate.client.MutatorClient;
+import org.apache.hive.hcatalog.streaming.mutate.client.MutatorClientBuilder;
+import org.apache.hive.hcatalog.streaming.mutate.client.MutatorDestination;
+import org.apache.hive.hcatalog.streaming.mutate.client.Transaction;
+import org.apache.hive.hcatalog.streaming.mutate.worker.BucketIdAppender;
+import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorCoordinator;
+import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorCoordinatorBuilder;
+import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorFactory;
+
+public class ExampleUseCase {
+
+ private String metaStoreUri;
+ private String databaseName;
+ private String tableName;
+ private boolean createPartitions = true;
+ private List partitionValues1, partitionValues2, partitionValues3;
+ private Object record1, record2, record3;
+ private MutatorFactory mutatorFactory;
+
+ /* This is an illustration, not a functioning example. */
+ public void example() throws Exception {
+ // CLIENT/TOOL END
+ //
+ // Singleton instance in the job client
+
+ // Create a client to manage our transaction
+ MutatorClient client = new MutatorClientBuilder()
+ .addTable(databaseName, tableName, createPartitions)
+ .metaStoreUri(metaStoreUri)
+ .build();
+
+ // Get the transaction
+ Transaction transaction = client.newTransaction();
+
+ // Get serializable details of the destination tables
+ List destinations = client.getDestinations();
+
+ transaction.begin();
+
+ // CLUSTER / WORKER END
+ //
+ // Job submitted to the cluster
+ //
+
+ BucketIdAppender bucketIdAppender = mutatorFactory.newBucketIdAppender(destinations.get(0).getTotalBuckets());
+ record1 = bucketIdAppender.attachBucketIdToRecord(record1);
+
+ // --------------------------------------------------------------
+ // DATA SHOULD GET SORTED BY YOUR ETL/MERGE PROCESS HERE
+ //
+ // Group the data by (partitionValues, ROW__ID.bucketId)
+ // Order the groups by (ROW__ID.lastTransactionId, ROW__ID.rowId)
+ // --------------------------------------------------------------
+
+ // One of these runs at the output of each reducer
+ //
+ MutatorCoordinator coordinator = new MutatorCoordinatorBuilder()
+ .metaStoreUri(metaStoreUri)
+ .destination(destinations.get(0))
+ .mutatorFactory(mutatorFactory)
+ .build();
+
+ coordinator.insert(partitionValues1, record1);
+ coordinator.update(partitionValues2, record2);
+ coordinator.delete(partitionValues3, record3);
+
+ coordinator.close();
+
+ // CLIENT/TOOL END
+ //
+ // The tasks have completed, control is back at the tool
+
+ transaction.commit();
+
+ client.close();
+ }
+
+}
diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/MutableRecord.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/MutableRecord.java
new file mode 100644
index 0000000..0d87a31
--- /dev/null
+++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/MutableRecord.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming.mutate;
+
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.io.Text;
+
+public class MutableRecord {
+
+ // Column 0
+ public final int id;
+ // Column 1
+ public final Text msg;
+ // Column 2
+ public RecordIdentifier rowId;
+
+ public MutableRecord(int id, String msg, RecordIdentifier rowId) {
+ this.id = id;
+ this.msg = new Text(msg);
+ this.rowId = rowId;
+ }
+
+ public MutableRecord(int id, String msg) {
+ this.id = id;
+ this.msg = new Text(msg);
+ rowId = null;
+ }
+
+ @Override
+ public String toString() {
+ return "MutableRecord [id=" + id + ", msg=" + msg + ", rowId=" + rowId + "]";
+ }
+
+}
diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java
new file mode 100644
index 0000000..48e8867
--- /dev/null
+++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java
@@ -0,0 +1,69 @@
+package org.apache.hive.hcatalog.streaming.mutate;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hive.hcatalog.streaming.mutate.worker.BucketIdAppenderImpl;
+import org.apache.hive.hcatalog.streaming.mutate.worker.AbstractMutator;
+import org.apache.hive.hcatalog.streaming.mutate.worker.BucketIdAppender;
+import org.apache.hive.hcatalog.streaming.mutate.worker.Mutator;
+import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorFactory;
+import org.apache.hive.hcatalog.streaming.mutate.worker.RecordInspector;
+import org.apache.hive.hcatalog.streaming.mutate.worker.RecordInspectorImpl;
+
+public class ReflectiveMutatorFactory implements MutatorFactory {
+
+ private final int recordIdColumn;
+ private final ObjectInspector objectInspector;
+ private final Configuration configuration;
+ private final int[] bucketColumnIndexes;
+
+ public ReflectiveMutatorFactory(Configuration configuration, Class> recordClass, int recordIdColumn,
+ int[] bucketColumnIndexes) {
+ this.configuration = configuration;
+ this.recordIdColumn = recordIdColumn;
+ this.bucketColumnIndexes = bucketColumnIndexes;
+ objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(recordClass,
+ ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ @Override
+ public Mutator newMutator(AcidOutputFormat, ?> outputFormat, long transactionId, Path partitionPath, int bucketId)
+ throws IOException {
+ return new ReflectiveMutator(configuration, outputFormat, transactionId, partitionPath, bucketId);
+ }
+
+ @Override
+ public RecordInspector newRecordInspector() {
+ return new RecordInspectorImpl(objectInspector, recordIdColumn);
+ }
+
+ @Override
+ public BucketIdAppender newBucketIdAppender(int totalBuckets) {
+ return new BucketIdAppenderImpl(objectInspector, recordIdColumn, totalBuckets, bucketColumnIndexes);
+ }
+
+ private class ReflectiveMutator extends AbstractMutator {
+
+ protected ReflectiveMutator(Configuration configuration, AcidOutputFormat, ?> outputFormat, long transactionId,
+ Path partitionPath, int bucketId) throws IOException {
+ super(configuration, outputFormat, transactionId, partitionPath, bucketId);
+ }
+
+ @Override
+ protected ObjectInspector getObjectInspector() {
+ return objectInspector;
+ }
+
+ @Override
+ protected int getRecordIdColumn() {
+ return recordIdColumn;
+ }
+
+ }
+
+}
diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
new file mode 100644
index 0000000..477ed8c
--- /dev/null
+++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming.mutate;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidInputFormat.AcidRecordReader;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.AcidUtils.Directory;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.thrift.TException;
+
+public class StreamingAssert {
+
+ public static class Factory {
+ private IMetaStoreClient metaStoreClient;
+ private final HiveConf conf;
+
+ public Factory(IMetaStoreClient metaStoreClient, HiveConf conf) {
+ this.metaStoreClient = metaStoreClient;
+ this.conf = conf;
+ }
+
+ public StreamingAssert newStreamingAssert(Table table) throws Exception {
+ return newStreamingAssert(table, Collections. emptyList());
+ }
+
+ public StreamingAssert newStreamingAssert(Table table, List partition) throws Exception {
+ return new StreamingAssert(metaStoreClient, conf, table, partition);
+ }
+ }
+
+ private Table table;
+ private List partition;
+ private IMetaStoreClient metaStoreClient;
+ private Directory dir;
+ private ValidTxnList txns;
+ private List currentDeltas;
+ private long min;
+ private long max;
+ private Path partitionLocation;
+
+ StreamingAssert(IMetaStoreClient metaStoreClient, HiveConf conf, Table table, List partition)
+ throws Exception {
+ this.metaStoreClient = metaStoreClient;
+ this.table = table;
+ this.partition = partition;
+
+ txns = metaStoreClient.getValidTxns();
+ partitionLocation = getPartitionLocation();
+ dir = AcidUtils.getAcidState(partitionLocation, conf, txns);
+ assertEquals(0, dir.getObsolete().size());
+ assertEquals(0, dir.getOriginalFiles().size());
+
+ currentDeltas = dir.getCurrentDirectories();
+ min = Long.MAX_VALUE;
+ max = Long.MIN_VALUE;
+ System.out.println("Files found: ");
+ for (AcidUtils.ParsedDelta parsedDelta : currentDeltas) {
+ System.out.println(parsedDelta.getPath().toString());
+ max = Math.max(parsedDelta.getMaxTransaction(), max);
+ min = Math.min(parsedDelta.getMinTransaction(), min);
+ }
+ }
+
+ public void assertExpectedFileCount(int expectedFileCount) {
+ assertEquals(expectedFileCount, currentDeltas.size());
+ }
+
+ public void assertNothingWritten() {
+ assertExpectedFileCount(0);
+ }
+
+ public void assertMinTransactionId(long expectedMinTransactionId) {
+ if (currentDeltas.isEmpty()) {
+ throw new AssertionError("No data");
+ }
+ assertEquals(expectedMinTransactionId, min);
+ }
+
+ public void assertMaxTransactionId(long expectedMaxTransactionId) {
+ if (currentDeltas.isEmpty()) {
+ throw new AssertionError("No data");
+ }
+ assertEquals(expectedMaxTransactionId, max);
+ }
+
+ List readRecords() throws Exception {
+ if (currentDeltas.isEmpty()) {
+ throw new AssertionError("No data");
+ }
+ InputFormat inputFormat = new OrcInputFormat();
+ JobConf job = new JobConf();
+ job.set("mapred.input.dir", partitionLocation.toString());
+ job.set("bucket_count", Integer.toString(table.getSd().getNumBuckets()));
+ job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
+ InputSplit[] splits = inputFormat.getSplits(job, 1);
+ assertEquals(1, splits.length);
+
+ final AcidRecordReader recordReader = (AcidRecordReader) inputFormat
+ .getRecordReader(splits[0], job, Reporter.NULL);
+
+ NullWritable key = recordReader.createKey();
+ OrcStruct value = recordReader.createValue();
+
+ List records = new ArrayList<>();
+ while (recordReader.next(key, value)) {
+ RecordIdentifier recordIdentifier = recordReader.getRecordIdentifier();
+ Record record = new Record(new RecordIdentifier(recordIdentifier.getTransactionId(),
+ recordIdentifier.getBucketId(), recordIdentifier.getRowId()), value.toString());
+ System.out.println(record);
+ records.add(record);
+ }
+ recordReader.close();
+ return records;
+ }
+
+ private Path getPartitionLocation() throws NoSuchObjectException, MetaException, TException {
+ Path partitionLocacation;
+ if (partition.isEmpty()) {
+ partitionLocacation = new Path(table.getSd().getLocation());
+ } else {
+ // TODO: calculate this instead. Just because we're writing to the location doesn't mean that it'll
+ // always be wanted in the meta store right away.
+ List partitionEntries = metaStoreClient.listPartitions(table.getDbName(), table.getTableName(),
+ partition, (short) 1);
+ partitionLocacation = new Path(partitionEntries.get(0).getSd().getLocation());
+ }
+ return partitionLocacation;
+ }
+
+ public static class Record {
+ private RecordIdentifier recordIdentifier;
+ private String row;
+
+ Record(RecordIdentifier recordIdentifier, String row) {
+ this.recordIdentifier = recordIdentifier;
+ this.row = row;
+ }
+
+ public RecordIdentifier getRecordIdentifier() {
+ return recordIdentifier;
+ }
+
+ public String getRow() {
+ return row;
+ }
+
+ @Override
+ public String toString() {
+ return "Record [recordIdentifier=" + recordIdentifier + ", row=" + row + "]";
+ }
+
+ }
+
+}
diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingTestUtils.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingTestUtils.java
new file mode 100644
index 0000000..a8f38f1
--- /dev/null
+++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingTestUtils.java
@@ -0,0 +1,238 @@
+package org.apache.hive.hcatalog.streaming.mutate;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.thrift.TException;
+
+public class StreamingTestUtils {
+
+ public HiveConf newHiveConf(String metaStoreUri) {
+ HiveConf conf = new HiveConf(this.getClass());
+ conf.set("fs.raw.impl", RawFileSystem.class.getName());
+ if (metaStoreUri != null) {
+ conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
+ }
+ conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+ return conf;
+ }
+
+ public void prepareTransactionDatabase(HiveConf conf) throws Exception {
+ TxnDbUtil.setConfValues(conf);
+ TxnDbUtil.cleanDb();
+ TxnDbUtil.prepDb();
+ }
+
+ public IMetaStoreClient newMetaStoreClient(HiveConf conf) throws Exception {
+ return new HiveMetaStoreClient(conf);
+ }
+
+ public static class RawFileSystem extends RawLocalFileSystem {
+ private static final URI NAME;
+ static {
+ try {
+ NAME = new URI("raw:///");
+ } catch (URISyntaxException se) {
+ throw new IllegalArgumentException("bad uri", se);
+ }
+ }
+
+ @Override
+ public URI getUri() {
+ return NAME;
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ File file = pathToFile(path);
+ if (!file.exists()) {
+ throw new FileNotFoundException("Can't find " + path);
+ }
+ // get close enough
+ short mod = 0;
+ if (file.canRead()) {
+ mod |= 0444;
+ }
+ if (file.canWrite()) {
+ mod |= 0200;
+ }
+ if (file.canExecute()) {
+ mod |= 0111;
+ }
+ return new FileStatus(file.length(), file.isDirectory(), 1, 1024, file.lastModified(), file.lastModified(),
+ FsPermission.createImmutable(mod), "owen", "users", path);
+ }
+ }
+
+ public static DatabaseBuilder databaseBuilder(File warehouseFolder) {
+ return new DatabaseBuilder(warehouseFolder);
+ }
+
+ public static class DatabaseBuilder {
+
+ private Database database;
+ private File warehouseFolder;
+
+ public DatabaseBuilder(File warehouseFolder) {
+ this.warehouseFolder = warehouseFolder;
+ database = new Database();
+ }
+
+ public DatabaseBuilder name(String name) {
+ database.setName(name);
+ File databaseFolder = new File(warehouseFolder, name + ".db");
+ String databaseLocation = "raw://" + databaseFolder.toURI().getPath();
+ database.setLocationUri(databaseLocation);
+ return this;
+ }
+
+ public Database dropAndCreate(IMetaStoreClient metaStoreClient) throws Exception {
+ try {
+ for (String table : metaStoreClient.listTableNamesByFilter(database.getName(), "", (short) -1)) {
+ metaStoreClient.dropTable(database.getName(), table, true, true);
+ }
+ metaStoreClient.dropDatabase(database.getName());
+ } catch (TException e) {
+ }
+ metaStoreClient.createDatabase(database);
+ return database;
+ }
+
+ }
+
+ public static TableBuilder tableBuilder(Database database) {
+ return new TableBuilder(database);
+ }
+
+ public static class TableBuilder {
+
+ private Table table;
+ private StorageDescriptor sd;
+ private SerDeInfo serDeInfo;
+ private Database database;
+ private List> partitions;
+ private List columnNames;
+ private List columnTypes;
+ private List partitionKeys;
+
+ public TableBuilder(Database database) {
+ this.database = database;
+ partitions = new ArrayList<>();
+ columnNames = new ArrayList<>();
+ columnTypes = new ArrayList<>();
+ partitionKeys = Collections.emptyList();
+ table = new Table();
+ table.setDbName(database.getName());
+ table.setTableType(TableType.MANAGED_TABLE.toString());
+ Map tableParams = new HashMap();
+ tableParams.put("transactional", Boolean.TRUE.toString());
+ table.setParameters(tableParams);
+
+ sd = new StorageDescriptor();
+ sd.setInputFormat(HiveInputFormat.class.getName());
+ sd.setOutputFormat(OrcOutputFormat.class.getName());
+ sd.setNumBuckets(1);
+ table.setSd(sd);
+
+ serDeInfo = new SerDeInfo();
+ serDeInfo.setParameters(new HashMap());
+ serDeInfo.getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
+ serDeInfo.setSerializationLib(OrcSerde.class.getName());
+ sd.setSerdeInfo(serDeInfo);
+ }
+
+ public TableBuilder name(String name) {
+ sd.setLocation(database.getLocationUri() + Path.SEPARATOR + name);
+ table.setTableName(name);
+ serDeInfo.setName(name);
+ return this;
+ }
+
+ public TableBuilder buckets(int buckets) {
+ sd.setNumBuckets(buckets);
+ return this;
+ }
+
+ public TableBuilder addColumn(String columnName, String columnType) {
+ columnNames.add(columnName);
+ columnTypes.add(columnType);
+ return this;
+ }
+
+ public TableBuilder partitionKeys(String... partitionKeys) {
+ this.partitionKeys = Arrays.asList(partitionKeys);
+ return this;
+ }
+
+ public TableBuilder addPartition(String... partitionValues) {
+ partitions.add(Arrays.asList(partitionValues));
+ return this;
+ }
+ public TableBuilder addPartition(List partitionValues) {
+ partitions.add(partitionValues);
+ return this;
+ }
+
+ public Table create(IMetaStoreClient metaStoreClient) throws Exception {
+ List fields = new ArrayList(columnNames.size());
+ for (int i = 0; i < columnNames.size(); i++) {
+ fields.add(new FieldSchema(columnNames.get(i), columnTypes.get(i), ""));
+ }
+ sd.setCols(fields);
+
+ if (!partitionKeys.isEmpty()) {
+ List partitionFields = new ArrayList();
+ for (String partitionKey : partitionKeys) {
+ partitionFields.add(new FieldSchema(partitionKey, serdeConstants.STRING_TYPE_NAME, ""));
+ }
+ table.setPartitionKeys(partitionFields);
+ }
+ metaStoreClient.createTable(table);
+
+ for (List partitionValues : partitions) {
+ Partition partition = new Partition();
+ partition.setDbName(database.getName());
+ partition.setTableName(table.getTableName());
+ StorageDescriptor partitionSd = new StorageDescriptor(table.getSd());
+ partitionSd.setLocation(table.getSd().getLocation() + Path.SEPARATOR
+ + Warehouse.makePartName(table.getPartitionKeys(), partitionValues));
+ partition.setSd(partitionSd);
+ partition.setValues(partitionValues);
+
+ metaStoreClient.add_partition(partition);
+ }
+ return table;
+ }
+ }
+
+}
diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
new file mode 100644
index 0000000..d2105c9
--- /dev/null
+++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
@@ -0,0 +1,544 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming.mutate;
+
+import static org.apache.hive.hcatalog.streaming.TransactionBatch.TxnState.ABORTED;
+import static org.apache.hive.hcatalog.streaming.TransactionBatch.TxnState.COMMITTED;
+import static org.apache.hive.hcatalog.streaming.mutate.StreamingTestUtils.databaseBuilder;
+import static org.apache.hive.hcatalog.streaming.mutate.StreamingTestUtils.tableBuilder;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hive.hcatalog.streaming.TestStreaming;
+import org.apache.hive.hcatalog.streaming.mutate.StreamingAssert.Factory;
+import org.apache.hive.hcatalog.streaming.mutate.StreamingAssert.Record;
+import org.apache.hive.hcatalog.streaming.mutate.StreamingTestUtils.TableBuilder;
+import org.apache.hive.hcatalog.streaming.mutate.client.MutatorClient;
+import org.apache.hive.hcatalog.streaming.mutate.client.MutatorClientBuilder;
+import org.apache.hive.hcatalog.streaming.mutate.client.MutatorDestination;
+import org.apache.hive.hcatalog.streaming.mutate.client.Transaction;
+import org.apache.hive.hcatalog.streaming.mutate.worker.BucketIdAppender;
+import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorCoordinator;
+import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorCoordinatorBuilder;
+import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorFactory;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * This test is based on {@link TestStreaming} and has a similar core set of tests to ensure that basic transactional
+ * behaviour is as expected in the {@link RecordMutator} line. This is complemented with a set of tests related to the
+ * use of update and delete operations.
+ */
+public class TestMutations {
+
+ private static final List EUROPE_FRANCE = Arrays.asList("Europe", "France");
+ private static final List EUROPE_UK = Arrays.asList("Europe", "UK");
+ private static final List ASIA_INDIA = Arrays.asList("Asia", "India");
+ // id
+ private static final int[] BUCKET_COLUMN_INDEXES = new int[] { 0 };
+ private static final int RECORD_ID_COLUMN = 2;
+
+ @Rule
+ public TemporaryFolder warehouseFolder = new TemporaryFolder();
+
+ private StreamingTestUtils testUtils = new StreamingTestUtils();
+ private HiveConf conf;
+ private IMetaStoreClient metaStoreClient;
+ private String metaStoreUri;
+ private Database database;
+ private TableBuilder partitionedTableBuilder;
+ private TableBuilder unpartitionedTableBuilder;
+ private Factory assertionFactory;
+
+ public TestMutations() throws Exception {
+ conf = testUtils.newHiveConf(metaStoreUri);
+ testUtils.prepareTransactionDatabase(conf);
+ metaStoreClient = testUtils.newMetaStoreClient(conf);
+ assertionFactory = new StreamingAssert.Factory(metaStoreClient, conf);
+ }
+
+ @Before
+ public void setup() throws Exception {
+ database = databaseBuilder(warehouseFolder.getRoot()).name("testing").dropAndCreate(metaStoreClient);
+
+ partitionedTableBuilder = tableBuilder(database)
+ .name("partitioned")
+ .addColumn("id", "int")
+ .addColumn("msg", "string")
+ .partitionKeys("continent", "country");
+
+ unpartitionedTableBuilder = tableBuilder(database)
+ .name("unpartitioned")
+ .addColumn("id", "int")
+ .addColumn("msg", "string");
+ }
+
+ @Test
+ public void testTransactionBatchEmptyCommitPartitioned() throws Exception {
+ Table table = partitionedTableBuilder.addPartition(ASIA_INDIA).create(metaStoreClient);
+
+ MutatorClient client = new MutatorClientBuilder()
+ .addTable(table.getDbName(), table.getTableName(), true)
+ .metaStoreUri(metaStoreUri)
+ .build();
+ client.connect();
+
+ Transaction transaction = client.newTransaction();
+
+ transaction.begin();
+
+ transaction.commit();
+ assertThat(transaction.getState(), is(COMMITTED));
+ client.close();
+ }
+
+ @Test
+ public void testTransactionBatchEmptyCommitUnpartitioned() throws Exception {
+ Table table = unpartitionedTableBuilder.create(metaStoreClient);
+
+ MutatorClient client = new MutatorClientBuilder()
+ .addTable(table.getDbName(), table.getTableName(), false)
+ .metaStoreUri(metaStoreUri)
+ .build();
+ client.connect();
+
+ Transaction transaction = client.newTransaction();
+
+ transaction.begin();
+
+ transaction.commit();
+ assertThat(transaction.getState(), is(COMMITTED));
+ client.close();
+ }
+
+ @Test
+ public void testTransactionBatchEmptyAbortPartitioned() throws Exception {
+ Table table = partitionedTableBuilder.addPartition(ASIA_INDIA).create(metaStoreClient);
+
+ MutatorClient client = new MutatorClientBuilder()
+ .addTable(table.getDbName(), table.getTableName(), true)
+ .metaStoreUri(metaStoreUri)
+ .build();
+ client.connect();
+
+ Transaction transaction = client.newTransaction();
+
+ List destinations = client.getDestinations();
+
+ transaction.begin();
+
+ MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, MutableRecord.class, RECORD_ID_COLUMN,
+ BUCKET_COLUMN_INDEXES);
+ MutatorCoordinator coordinator = new MutatorCoordinatorBuilder()
+ .metaStoreUri(metaStoreUri)
+ .destination(destinations.get(0))
+ .mutatorFactory(mutatorFactory)
+ .build();
+
+ coordinator.close();
+
+ transaction.abort();
+ assertThat(transaction.getState(), is(ABORTED));
+ client.close();
+ }
+
+ @Test
+ public void testTransactionBatchEmptyAbortUnartitioned() throws Exception {
+ Table table = unpartitionedTableBuilder.create(metaStoreClient);
+
+ MutatorClient client = new MutatorClientBuilder()
+ .addTable(table.getDbName(), table.getTableName(), false)
+ .metaStoreUri(metaStoreUri)
+ .build();
+ client.connect();
+
+ Transaction transaction = client.newTransaction();
+
+ List destinations = client.getDestinations();
+
+ transaction.begin();
+
+ MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, MutableRecord.class, RECORD_ID_COLUMN,
+ BUCKET_COLUMN_INDEXES);
+ MutatorCoordinator coordinator = new MutatorCoordinatorBuilder()
+ .metaStoreUri(metaStoreUri)
+ .destination(destinations.get(0))
+ .mutatorFactory(mutatorFactory)
+ .build();
+
+ coordinator.close();
+
+ transaction.abort();
+ assertThat(transaction.getState(), is(ABORTED));
+ client.close();
+ }
+
+ @Test
+ public void testTransactionBatchCommitPartitioned() throws Exception {
+ Table table = partitionedTableBuilder.addPartition(ASIA_INDIA).create(metaStoreClient);
+
+ MutatorClient client = new MutatorClientBuilder()
+ .addTable(table.getDbName(), table.getTableName(), true)
+ .metaStoreUri(metaStoreUri)
+ .build();
+ client.connect();
+
+ Transaction transaction = client.newTransaction();
+
+ List destinations = client.getDestinations();
+
+ transaction.begin();
+
+ MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, MutableRecord.class, RECORD_ID_COLUMN,
+ BUCKET_COLUMN_INDEXES);
+ MutatorCoordinator coordinator = new MutatorCoordinatorBuilder()
+ .metaStoreUri(metaStoreUri)
+ .destination(destinations.get(0))
+ .mutatorFactory(mutatorFactory)
+ .build();
+
+ BucketIdAppender bucketIdAppender = mutatorFactory.newBucketIdAppender(destinations.get(0).getTotalBuckets());
+ MutableRecord record = (MutableRecord) bucketIdAppender.attachBucketIdToRecord(new MutableRecord(1,
+ "Hello streaming"));
+ coordinator.insert(ASIA_INDIA, record);
+ coordinator.close();
+
+ transaction.commit();
+
+ StreamingAssert streamingAssertions = assertionFactory.newStreamingAssert(table, ASIA_INDIA);
+ streamingAssertions.assertMinTransactionId(1L);
+ streamingAssertions.assertMaxTransactionId(1L);
+ streamingAssertions.assertExpectedFileCount(1);
+
+ List readRecords = streamingAssertions.readRecords();
+ assertThat(readRecords.size(), is(1));
+ assertThat(readRecords.get(0).getRow(), is("{1, Hello streaming}"));
+ assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L)));
+
+ assertThat(transaction.getState(), is(COMMITTED));
+ client.close();
+ }
+
+ @Test
+ public void testMulti() throws Exception {
+ Table table = partitionedTableBuilder.addPartition(ASIA_INDIA).create(metaStoreClient);
+
+ MutatorClient client = new MutatorClientBuilder()
+ .addTable(table.getDbName(), table.getTableName(), true)
+ .metaStoreUri(metaStoreUri)
+ .build();
+ client.connect();
+
+ Transaction transaction = client.newTransaction();
+
+ List destinations = client.getDestinations();
+
+ transaction.begin();
+
+ MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, MutableRecord.class, RECORD_ID_COLUMN,
+ BUCKET_COLUMN_INDEXES);
+ MutatorCoordinator coordinator = new MutatorCoordinatorBuilder()
+ .metaStoreUri(metaStoreUri)
+ .destination(destinations.get(0))
+ .mutatorFactory(mutatorFactory)
+ .build();
+
+ BucketIdAppender bucketIdAppender = mutatorFactory.newBucketIdAppender(destinations.get(0).getTotalBuckets());
+ MutableRecord asiaIndiaRecord1 = (MutableRecord) bucketIdAppender.attachBucketIdToRecord(new MutableRecord(1,
+ "Hello streaming"));
+ MutableRecord europeUkRecord1 = (MutableRecord) bucketIdAppender.attachBucketIdToRecord(new MutableRecord(2,
+ "Hello streaming"));
+ MutableRecord europeFranceRecord1 = (MutableRecord) bucketIdAppender.attachBucketIdToRecord(new MutableRecord(3,
+ "Hello streaming"));
+ MutableRecord europeFranceRecord2 = (MutableRecord) bucketIdAppender.attachBucketIdToRecord(new MutableRecord(4,
+ "Bonjour streaming"));
+
+ coordinator.insert(ASIA_INDIA, asiaIndiaRecord1);
+ coordinator.insert(EUROPE_UK, europeUkRecord1);
+ coordinator.insert(EUROPE_FRANCE, europeFranceRecord1);
+ coordinator.insert(EUROPE_FRANCE, europeFranceRecord2);
+ coordinator.close();
+
+ transaction.commit();
+
+ // ASIA_INDIA
+ StreamingAssert streamingAssertions = assertionFactory.newStreamingAssert(table, ASIA_INDIA);
+ streamingAssertions.assertMinTransactionId(1L);
+ streamingAssertions.assertMaxTransactionId(1L);
+ streamingAssertions.assertExpectedFileCount(1);
+
+ List readRecords = streamingAssertions.readRecords();
+ assertThat(readRecords.size(), is(1));
+ assertThat(readRecords.get(0).getRow(), is("{1, Hello streaming}"));
+ assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L)));
+
+ // EUROPE_UK
+ streamingAssertions = assertionFactory.newStreamingAssert(table, EUROPE_UK);
+ streamingAssertions.assertMinTransactionId(1L);
+ streamingAssertions.assertMaxTransactionId(1L);
+ streamingAssertions.assertExpectedFileCount(1);
+
+ readRecords = streamingAssertions.readRecords();
+ assertThat(readRecords.size(), is(1));
+ assertThat(readRecords.get(0).getRow(), is("{2, Hello streaming}"));
+ assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L)));
+
+ // EUROPE_FRANCE
+ streamingAssertions = assertionFactory.newStreamingAssert(table, EUROPE_FRANCE);
+ streamingAssertions.assertMinTransactionId(1L);
+ streamingAssertions.assertMaxTransactionId(1L);
+ streamingAssertions.assertExpectedFileCount(1);
+
+ readRecords = streamingAssertions.readRecords();
+ assertThat(readRecords.size(), is(2));
+ assertThat(readRecords.get(0).getRow(), is("{3, Hello streaming}"));
+ assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L)));
+ assertThat(readRecords.get(1).getRow(), is("{4, Bonjour streaming}"));
+ assertThat(readRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 1L)));
+
+ client.close();
+ }
+
+ @Test
+ public void testTransactionBatchCommitUnpartitioned() throws Exception {
+ Table table = unpartitionedTableBuilder.create(metaStoreClient);
+
+ MutatorClient client = new MutatorClientBuilder()
+ .addTable(table.getDbName(), table.getTableName(), false)
+ .metaStoreUri(metaStoreUri)
+ .build();
+ client.connect();
+
+ Transaction transaction = client.newTransaction();
+
+ List destinations = client.getDestinations();
+
+ transaction.begin();
+
+ MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, MutableRecord.class, RECORD_ID_COLUMN,
+ BUCKET_COLUMN_INDEXES);
+ MutatorCoordinator coordinator = new MutatorCoordinatorBuilder()
+ .metaStoreUri(metaStoreUri)
+ .destination(destinations.get(0))
+ .mutatorFactory(mutatorFactory)
+ .build();
+
+ BucketIdAppender bucketIdAppender = mutatorFactory.newBucketIdAppender(destinations.get(0).getTotalBuckets());
+ MutableRecord record = (MutableRecord) bucketIdAppender.attachBucketIdToRecord(new MutableRecord(1,
+ "Hello streaming"));
+
+ coordinator.insert(Collections. emptyList(), record);
+ coordinator.close();
+
+ transaction.commit();
+
+ StreamingAssert streamingAssertions = assertionFactory.newStreamingAssert(table);
+ streamingAssertions.assertMinTransactionId(1L);
+ streamingAssertions.assertMaxTransactionId(1L);
+ streamingAssertions.assertExpectedFileCount(1);
+
+ List readRecords = streamingAssertions.readRecords();
+ assertThat(readRecords.size(), is(1));
+ assertThat(readRecords.get(0).getRow(), is("{1, Hello streaming}"));
+ assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L)));
+
+ assertThat(transaction.getState(), is(COMMITTED));
+ client.close();
+ }
+
+ @Test
+ public void testTransactionBatchAbort() throws Exception {
+ Table table = partitionedTableBuilder.addPartition(ASIA_INDIA).create(metaStoreClient);
+
+ MutatorClient client = new MutatorClientBuilder()
+ .addTable(table.getDbName(), table.getTableName(), true)
+ .metaStoreUri(metaStoreUri)
+ .build();
+ client.connect();
+
+ Transaction transaction = client.newTransaction();
+
+ List destinations = client.getDestinations();
+
+ transaction.begin();
+
+ MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, MutableRecord.class, RECORD_ID_COLUMN,
+ BUCKET_COLUMN_INDEXES);
+ MutatorCoordinator coordinator = new MutatorCoordinatorBuilder()
+ .metaStoreUri(metaStoreUri)
+ .destination(destinations.get(0))
+ .mutatorFactory(mutatorFactory)
+ .build();
+
+ BucketIdAppender bucketIdAppender = mutatorFactory.newBucketIdAppender(destinations.get(0).getTotalBuckets());
+ MutableRecord record1 = (MutableRecord) bucketIdAppender.attachBucketIdToRecord(new MutableRecord(1,
+ "Hello streaming"));
+ MutableRecord record2 = (MutableRecord) bucketIdAppender.attachBucketIdToRecord(new MutableRecord(2,
+ "Welcome to streaming"));
+
+ coordinator.insert(ASIA_INDIA, record1);
+ coordinator.insert(ASIA_INDIA, record2);
+ coordinator.close();
+
+ transaction.abort();
+
+ assertThat(transaction.getState(), is(ABORTED));
+
+ client.close();
+
+ StreamingAssert streamingAssertions = assertionFactory.newStreamingAssert(table, ASIA_INDIA);
+ streamingAssertions.assertNothingWritten();
+ }
+
+ @Test
+ public void testUpdatesAndDeletes() throws Exception {
+ // Set up some base data then stream some inserts/updates/deletes to a number of partitions
+ MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, MutableRecord.class, RECORD_ID_COLUMN,
+ BUCKET_COLUMN_INDEXES);
+
+ // INSERT DATA
+ //
+ Table table = partitionedTableBuilder.addPartition(ASIA_INDIA).addPartition(EUROPE_FRANCE).create(metaStoreClient);
+
+ MutatorClient client = new MutatorClientBuilder()
+ .addTable(table.getDbName(), table.getTableName(), true)
+ .metaStoreUri(metaStoreUri)
+ .build();
+ client.connect();
+
+ Transaction insertTransaction = client.newTransaction();
+
+ List destinations = client.getDestinations();
+
+ insertTransaction.begin();
+
+ MutatorCoordinator insertCoordinator = new MutatorCoordinatorBuilder()
+ .metaStoreUri(metaStoreUri)
+ .destination(destinations.get(0))
+ .mutatorFactory(mutatorFactory)
+ .build();
+
+ BucketIdAppender bucketIdAppender = mutatorFactory.newBucketIdAppender(destinations.get(0).getTotalBuckets());
+ MutableRecord asiaIndiaRecord1 = (MutableRecord) bucketIdAppender.attachBucketIdToRecord(new MutableRecord(1,
+ "Namaste streaming 1"));
+ MutableRecord asiaIndiaRecord2 = (MutableRecord) bucketIdAppender.attachBucketIdToRecord(new MutableRecord(2,
+ "Namaste streaming 2"));
+ MutableRecord europeUkRecord1 = (MutableRecord) bucketIdAppender.attachBucketIdToRecord(new MutableRecord(3,
+ "Hello streaming 1"));
+ MutableRecord europeUkRecord2 = (MutableRecord) bucketIdAppender.attachBucketIdToRecord(new MutableRecord(4,
+ "Hello streaming 2"));
+ MutableRecord europeFranceRecord1 = (MutableRecord) bucketIdAppender.attachBucketIdToRecord(new MutableRecord(5,
+ "Bonjour streaming 1"));
+ MutableRecord europeFranceRecord2 = (MutableRecord) bucketIdAppender.attachBucketIdToRecord(new MutableRecord(6,
+ "Bonjour streaming 2"));
+
+ insertCoordinator.insert(ASIA_INDIA, asiaIndiaRecord1);
+ insertCoordinator.insert(ASIA_INDIA, asiaIndiaRecord2);
+ insertCoordinator.insert(EUROPE_UK, europeUkRecord1);
+ insertCoordinator.insert(EUROPE_UK, europeUkRecord2);
+ insertCoordinator.insert(EUROPE_FRANCE, europeFranceRecord1);
+ insertCoordinator.insert(EUROPE_FRANCE, europeFranceRecord2);
+ insertCoordinator.close();
+
+ insertTransaction.commit();
+
+ assertThat(insertTransaction.getState(), is(COMMITTED));
+ client.close();
+
+ // MUTATE DATA
+ //
+ client = new MutatorClientBuilder()
+ .addTable(table.getDbName(), table.getTableName(), true)
+ .metaStoreUri(metaStoreUri)
+ .build();
+ client.connect();
+
+ Transaction mutateTransaction = client.newTransaction();
+
+ destinations = client.getDestinations();
+
+ mutateTransaction.begin();
+
+ MutatorCoordinator mutateCoordinator = new MutatorCoordinatorBuilder()
+ .metaStoreUri(metaStoreUri)
+ .destination(destinations.get(0))
+ .mutatorFactory(mutatorFactory)
+ .build();
+
+ bucketIdAppender = mutatorFactory.newBucketIdAppender(destinations.get(0).getTotalBuckets());
+ MutableRecord asiaIndiaRecord3 = (MutableRecord) bucketIdAppender.attachBucketIdToRecord(new MutableRecord(20,
+ "Namaste streaming 3"));
+
+ mutateCoordinator.update(ASIA_INDIA, new MutableRecord(2, "UPDATED: Namaste streaming 2", new RecordIdentifier(1L,
+ 0, 1L)));
+ mutateCoordinator.insert(ASIA_INDIA, asiaIndiaRecord3);
+ mutateCoordinator.delete(EUROPE_UK, new MutableRecord(3, "Hello streaming 1", new RecordIdentifier(1L, 0, 0L)));
+ mutateCoordinator.delete(EUROPE_FRANCE,
+ new MutableRecord(5, "Bonjour streaming 1", new RecordIdentifier(1L, 0, 0L)));
+ mutateCoordinator.update(EUROPE_FRANCE, new MutableRecord(6, "UPDATED: Bonjour streaming 2", new RecordIdentifier(
+ 1L, 0, 1L)));
+ mutateCoordinator.close();
+
+ mutateTransaction.commit();
+
+ assertThat(mutateTransaction.getState(), is(COMMITTED));
+
+ StreamingAssert indiaAssertions = assertionFactory.newStreamingAssert(table, ASIA_INDIA);
+ indiaAssertions.assertMinTransactionId(1L);
+ indiaAssertions.assertMaxTransactionId(2L);
+ List indiaRecords = indiaAssertions.readRecords();
+ assertThat(indiaRecords.size(), is(3));
+ assertThat(indiaRecords.get(0).getRow(), is("{1, Namaste streaming 1}"));
+ assertThat(indiaRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L)));
+ assertThat(indiaRecords.get(1).getRow(), is("{2, UPDATED: Namaste streaming 2}"));
+ assertThat(indiaRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 1L)));
+ assertThat(indiaRecords.get(2).getRow(), is("{20, Namaste streaming 3}"));
+ assertThat(indiaRecords.get(2).getRecordIdentifier(), is(new RecordIdentifier(2L, 0, 0L)));
+
+ StreamingAssert ukAssertions = assertionFactory.newStreamingAssert(table, EUROPE_UK);
+ ukAssertions.assertMinTransactionId(1L);
+ ukAssertions.assertMaxTransactionId(2L);
+ List ukRecords = ukAssertions.readRecords();
+ assertThat(ukRecords.size(), is(1));
+ assertThat(ukRecords.get(0).getRow(), is("{4, Hello streaming 2}"));
+ assertThat(ukRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 1L)));
+
+ StreamingAssert franceAssertions = assertionFactory.newStreamingAssert(table, EUROPE_FRANCE);
+ franceAssertions.assertMinTransactionId(1L);
+ franceAssertions.assertMaxTransactionId(2L);
+ List franceRecords = franceAssertions.readRecords();
+ assertThat(franceRecords.size(), is(1));
+ assertThat(franceRecords.get(0).getRow(), is("{6, UPDATED: Bonjour streaming 2}"));
+ assertThat(franceRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 1L)));
+
+ client.close();
+ }
+
+}
diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java
new file mode 100644
index 0000000..a12e066
--- /dev/null
+++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java
@@ -0,0 +1,168 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hive.hcatalog.streaming.ConnectionError;
+import org.apache.hive.hcatalog.streaming.TransactionBatch.TxnState;
+import org.apache.hive.hcatalog.streaming.mutate.client.lock.Lock;
+import org.apache.hive.hcatalog.streaming.mutate.client.lock.LockFailureListener;
+import org.apache.thrift.TException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMutatorClient {
+
+ private static final long TRANSACTION_ID = 42L;
+ private static final String TABLE_NAME_2 = "TABLE_2";
+ private static final String TABLE_NAME_1 = "TABLE_1";
+ private static final String DB_NAME = "DB_1";
+ private static final String USER = "user";
+ private static final MutatorDestination.Impl TABLE_1 = new MutatorDestination.Impl(DB_NAME, TABLE_NAME_1, true);
+ private static final MutatorDestination.Impl TABLE_2 = new MutatorDestination.Impl(DB_NAME, TABLE_NAME_2, true);
+
+ @Mock
+ private IMetaStoreClient mockMetaStoreClient;
+ @Mock
+ private Lock mockLock;
+ @Mock
+ private Table mockTable;
+ @Mock
+ private StorageDescriptor mockSd;
+ @Mock
+ private Map mockParameters;
+ @Mock
+ private HiveConf mockConfiguration;
+ @Mock
+ private LockFailureListener mockLockFailureListener;
+
+ private MutatorClient client;
+
+ @Before
+ public void configureMocks() throws Exception {
+ when(mockMetaStoreClient.getTable(anyString(), anyString())).thenReturn(mockTable);
+ when(mockTable.getSd()).thenReturn(mockSd);
+ when(mockTable.getParameters()).thenReturn(mockParameters);
+ when(mockSd.getNumBuckets()).thenReturn(1, 2);
+ when(mockSd.getOutputFormat()).thenReturn(OrcOutputFormat.class.getName());
+ when(mockParameters.get("transactional")).thenReturn(Boolean.TRUE.toString());
+
+ when(mockMetaStoreClient.openTxn(USER)).thenReturn(TRANSACTION_ID);
+
+ client = new MutatorClient(mockMetaStoreClient, mockConfiguration, mockLockFailureListener, USER,
+ Collections.singletonList(TABLE_1));
+ }
+
+ @Test
+ public void testCheckValidTableConnect() throws Exception {
+ List tables = new ArrayList<>();
+ tables.add(TABLE_1);
+ tables.add(TABLE_2);
+ client = new MutatorClient(mockMetaStoreClient, mockConfiguration, mockLockFailureListener, USER, tables);
+
+ client.connect();
+ List destinations = client.getDestinations();
+
+ assertThat(client.isConnected(), is(true));
+ assertThat(destinations.size(), is(2));
+ assertThat(destinations.get(0).getDatabaseName(), is(DB_NAME));
+ assertThat(destinations.get(0).getTableName(), is(TABLE_NAME_1));
+ assertThat(destinations.get(0).getTotalBuckets(), is(1));
+ assertThat(destinations.get(0).getOutputFormatName(), is(OrcOutputFormat.class.getName()));
+ assertThat(destinations.get(0).getTransactionId(), is(0L));
+ assertThat(destinations.get(1).getDatabaseName(), is(DB_NAME));
+ assertThat(destinations.get(1).getTableName(), is(TABLE_NAME_2));
+ assertThat(destinations.get(1).getTotalBuckets(), is(2));
+ assertThat(destinations.get(1).getOutputFormatName(), is(OrcOutputFormat.class.getName()));
+ assertThat(destinations.get(1).getTransactionId(), is(0L));
+ }
+
+ @Test
+ public void testCheckNonTransactionalTableConnect() throws Exception {
+ when(mockParameters.get("transactional")).thenReturn(Boolean.FALSE.toString());
+
+ try {
+ client.connect();
+ fail();
+ } catch (ConnectionError e) {
+ }
+
+ assertThat(client.isConnected(), is(false));
+ }
+
+ @Test
+ public void testCheckUnBucketedTableConnect() throws Exception {
+ when(mockSd.getNumBuckets()).thenReturn(0);
+
+ try {
+ client.connect();
+ fail();
+ } catch (ConnectionError e) {
+ }
+
+ assertThat(client.isConnected(), is(false));
+ }
+
+ @Test
+ public void testMetaStoreFailsOnConnect() throws Exception {
+ when(mockMetaStoreClient.getTable(anyString(), anyString())).thenThrow(new TException());
+
+ try {
+ client.connect();
+ fail();
+ } catch (ConnectionError e) {
+ }
+
+ assertThat(client.isConnected(), is(false));
+ }
+
+ @Test(expected = ConnectionError.class)
+ public void testGetDestinationsFailsIfNotConnected() throws Exception {
+ client.getDestinations();
+ }
+
+ @Test
+ public void testNewTransaction() throws Exception {
+ List tables = new ArrayList<>();
+ tables.add(TABLE_1);
+ tables.add(TABLE_2);
+ client = new MutatorClient(mockMetaStoreClient, mockConfiguration, mockLockFailureListener, USER, tables);
+
+ client.connect();
+ Transaction transaction = client.newTransaction();
+ List destinations = client.getDestinations();
+
+ assertThat(client.isConnected(), is(true));
+
+ assertThat(transaction.getTransactionId(), is(TRANSACTION_ID));
+ assertThat(transaction.getState(), is(TxnState.INACTIVE));
+ assertThat(destinations.get(0).getTransactionId(), is(TRANSACTION_ID));
+ assertThat(destinations.get(1).getTransactionId(), is(TRANSACTION_ID));
+ }
+
+ @Test
+ public void testCloseClosesClient() throws Exception {
+ client.close();
+ assertThat(client.isConnected(), is(false));
+ verify(mockMetaStoreClient).close();
+ }
+
+}
diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestTransaction.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestTransaction.java
new file mode 100644
index 0000000..ef2db68
--- /dev/null
+++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestTransaction.java
@@ -0,0 +1,96 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hive.hcatalog.streaming.TransactionBatch;
+import org.apache.hive.hcatalog.streaming.TransactionError;
+import org.apache.hive.hcatalog.streaming.mutate.client.lock.Lock;
+import org.apache.hive.hcatalog.streaming.mutate.client.lock.LockException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestTransaction {
+
+ private static final String USER = "user";
+ private static final long TRANSACTION_ID = 10L;
+
+ @Mock
+ private Lock mockLock;
+ @Mock
+ private IMetaStoreClient mockMetaStoreClient;
+
+ private Transaction transaction;
+
+ @Before
+ public void createTransaction() throws Exception {
+ when(mockLock.getUser()).thenReturn(USER);
+ when(mockMetaStoreClient.openTxn(USER)).thenReturn(TRANSACTION_ID);
+ transaction = new Transaction(mockMetaStoreClient, mockLock);
+ }
+
+ @Test
+ public void testInitialState() {
+ assertThat(transaction.getState(), is(TransactionBatch.TxnState.INACTIVE));
+ assertThat(transaction.getTransactionId(), is(TRANSACTION_ID));
+ }
+
+ @Test
+ public void testBegin() throws Exception {
+ transaction.begin();
+
+ verify(mockLock).acquire(TRANSACTION_ID);
+ assertThat(transaction.getState(), is(TransactionBatch.TxnState.OPEN));
+ }
+
+ @Test
+ public void testBeginLockFails() throws Exception {
+ doThrow(new LockException("")).when(mockLock).acquire(TRANSACTION_ID);
+
+ try {
+ transaction.begin();
+ } catch (TransactionError ignore) {
+ }
+
+ assertThat(transaction.getState(), is(TransactionBatch.TxnState.INACTIVE));
+ }
+
+ @Test
+ public void testCommit() throws Exception {
+ transaction.commit();
+
+ verify(mockLock).release();
+ verify(mockMetaStoreClient).commitTxn(TRANSACTION_ID);
+ assertThat(transaction.getState(), is(TransactionBatch.TxnState.COMMITTED));
+ }
+
+ @Test(expected = TransactionError.class)
+ public void testCommitLockFails() throws Exception {
+ doThrow(new LockException("")).when(mockLock).release();
+ transaction.commit();
+ }
+
+ @Test
+ public void testAbort() throws Exception {
+ transaction.abort();
+
+ verify(mockLock).release();
+ verify(mockMetaStoreClient).rollbackTxn(TRANSACTION_ID);
+ assertThat(transaction.getState(), is(TransactionBatch.TxnState.ABORTED));
+ }
+
+ @Test(expected = TransactionError.class)
+ public void testAbortLockFails() throws Exception {
+ doThrow(new LockException("")).when(mockLock).release();
+ transaction.abort();
+ }
+
+}
diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestHeartbeatTimerTask.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestHeartbeatTimerTask.java
new file mode 100644
index 0000000..8e6d06e
--- /dev/null
+++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestHeartbeatTimerTask.java
@@ -0,0 +1,100 @@
+package org.apache.hive.hcatalog.streaming.mutate.client.lock;
+
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.thrift.TException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestHeartbeatTimerTask {
+
+ private static final long TRANSACTION_ID = 10L;
+ private static final long LOCK_ID = 1L;
+ private static final List TABLES = createTable();
+
+ @Mock
+ private IMetaStoreClient mockMetaStoreClient;
+ @Mock
+ private LockFailureListener mockListener;
+
+ private HeartbeatTimerTask task;
+
+ @Before
+ public void create() throws Exception {
+ task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+ }
+
+ @Test
+ public void testRun() throws Exception {
+ task.run();
+
+ verify(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
+ }
+
+ @Test
+ public void testRunNullTransactionId() throws Exception {
+ task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, null, TABLES, LOCK_ID);
+
+ task.run();
+
+ verify(mockMetaStoreClient).heartbeat(0, LOCK_ID);
+ }
+
+ @Test
+ public void testRunHeartbeatFailsNoSuchLockException() throws Exception {
+ NoSuchLockException exception = new NoSuchLockException();
+ doThrow(exception).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
+
+ task.run();
+
+ verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Arrays.asList("DB.TABLE"), exception);
+ }
+
+ @Test
+ public void testRunHeartbeatFailsNoSuchTxnException() throws Exception {
+ NoSuchTxnException exception = new NoSuchTxnException();
+ doThrow(exception).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
+
+ task.run();
+
+ verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Arrays.asList("DB.TABLE"), exception);
+ }
+
+ @Test
+ public void testRunHeartbeatFailsTxnAbortedException() throws Exception {
+ TxnAbortedException exception = new TxnAbortedException();
+ doThrow(exception).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
+
+ task.run();
+
+ verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Arrays.asList("DB.TABLE"), exception);
+ }
+
+ @Test
+ public void testRunHeartbeatFailsTException() throws Exception {
+ TException exception = new TException();
+ doThrow(exception).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
+
+ task.run();
+ }
+
+ private static List createTable() {
+ Table table = new Table();
+ table.setDbName("DB");
+ table.setTableName("TABLE");
+ return Arrays.asList(table);
+ }
+}
diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java
new file mode 100644
index 0000000..ef1e80c
--- /dev/null
+++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java
@@ -0,0 +1,283 @@
+package org.apache.hive.hcatalog.streaming.mutate.client.lock;
+
+import static org.apache.hadoop.hive.metastore.api.LockState.ABORT;
+import static org.apache.hadoop.hive.metastore.api.LockState.ACQUIRED;
+import static org.apache.hadoop.hive.metastore.api.LockState.NOT_ACQUIRED;
+import static org.apache.hadoop.hive.metastore.api.LockState.WAITING;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.List;
+import java.util.Timer;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.thrift.TException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.google.common.collect.ImmutableList;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestLock {
+
+ private static final Table TABLE_1 = createTable("DB", "ONE");
+ private static final Table TABLE_2 = createTable("DB", "TWO");
+ private static final List TABLES = ImmutableList.of(TABLE_1, TABLE_2);
+ private static final long LOCK_ID = 42;
+ private static final long TRANSACTION_ID = 109;
+ private static final String USER = "ewest";
+
+ @Mock
+ private IMetaStoreClient mockMetaStoreClient;
+ @Mock
+ private LockFailureListener mockListener;
+ @Mock
+ private LockResponse mockLockResponse;
+ @Mock
+ private HeartbeatFactory mockHeartbeatFactory;
+ @Mock
+ private Timer mockHeartbeat;
+ @Captor
+ private ArgumentCaptor requestCaptor;
+
+ private Lock lock;
+ private HiveConf configuration = new HiveConf();
+
+ @Before
+ public void injectMocks() throws Exception {
+ when(mockMetaStoreClient.lock(any(LockRequest.class))).thenReturn(mockLockResponse);
+ when(mockLockResponse.getLockid()).thenReturn(LOCK_ID);
+ when(mockLockResponse.getState()).thenReturn(ACQUIRED);
+ when(
+ mockHeartbeatFactory.newInstance(any(IMetaStoreClient.class), any(LockFailureListener.class), any(Long.class),
+ any(Collection.class), anyLong(), anyInt())).thenReturn(mockHeartbeat);
+
+ lock = new Lock(mockMetaStoreClient, mockHeartbeatFactory, configuration, mockListener, USER, TABLES, 3, 0);
+ }
+
+ @Test
+ public void testAcquireReadLockWithNoIssues() throws Exception {
+ lock.acquire();
+ assertEquals(Long.valueOf(LOCK_ID), lock.getLockId());
+ assertNull(lock.getTransactionId());
+ }
+
+ @Test
+ public void testAcquireTxnLockWithNoIssues() throws Exception {
+ lock.acquire(TRANSACTION_ID);
+ assertEquals(Long.valueOf(LOCK_ID), lock.getLockId());
+ assertEquals(Long.valueOf(TRANSACTION_ID), lock.getTransactionId());
+ }
+
+ @Test
+ public void testAcquireReadLockCheckHeartbeatCreated() throws Exception {
+ configuration.set("hive.txn.timeout", "100s");
+ lock.acquire();
+
+ verify(mockHeartbeatFactory).newInstance(eq(mockMetaStoreClient), eq(mockListener), any(Long.class), eq(TABLES),
+ eq(LOCK_ID), eq(75));
+ }
+
+ @Test
+ public void testAcquireTxnLockCheckHeartbeatCreated() throws Exception {
+ configuration.set("hive.txn.timeout", "100s");
+ lock.acquire(TRANSACTION_ID);
+
+ verify(mockHeartbeatFactory).newInstance(eq(mockMetaStoreClient), eq(mockListener), eq(TRANSACTION_ID), eq(TABLES),
+ eq(LOCK_ID), eq(75));
+ }
+
+ @Test
+ public void testAcquireLockCheckUser() throws Exception {
+ lock.acquire();
+ verify(mockMetaStoreClient).lock(requestCaptor.capture());
+ LockRequest actualRequest = requestCaptor.getValue();
+ assertEquals(USER, actualRequest.getUser());
+ }
+
+ @Test
+ public void testAcquireReadLockCheckLocks() throws Exception {
+ lock.acquire();
+ verify(mockMetaStoreClient).lock(requestCaptor.capture());
+
+ LockRequest request = requestCaptor.getValue();
+ assertEquals(0, request.getTxnid());
+ assertEquals(USER, request.getUser());
+ assertEquals(InetAddress.getLocalHost().getHostName(), request.getHostname());
+
+ List components = request.getComponent();
+
+ assertEquals(2, components.size());
+
+ LockComponent expected1 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB");
+ expected1.setTablename("ONE");
+ assertTrue(components.contains(expected1));
+
+ LockComponent expected2 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB");
+ expected2.setTablename("TWO");
+ assertTrue(components.contains(expected2));
+ }
+
+ @Test
+ public void testAcquireTxnLockCheckLocks() throws Exception {
+ lock.acquire(TRANSACTION_ID);
+ verify(mockMetaStoreClient).lock(requestCaptor.capture());
+
+ LockRequest request = requestCaptor.getValue();
+ assertEquals(TRANSACTION_ID, request.getTxnid());
+ assertEquals(USER, request.getUser());
+ assertEquals(InetAddress.getLocalHost().getHostName(), request.getHostname());
+
+ List components = request.getComponent();
+
+ System.out.println(components);
+ assertEquals(2, components.size());
+
+ LockComponent expected1 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB");
+ expected1.setTablename("ONE");
+ assertTrue(components.contains(expected1));
+
+ LockComponent expected2 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB");
+ expected2.setTablename("TWO");
+ assertTrue(components.contains(expected2));
+ }
+
+ @Test(expected = LockException.class)
+ public void testAcquireLockNotAcquired() throws Exception {
+ when(mockLockResponse.getState()).thenReturn(NOT_ACQUIRED);
+ lock.acquire();
+ }
+
+ @Test(expected = LockException.class)
+ public void testAcquireLockAborted() throws Exception {
+ when(mockLockResponse.getState()).thenReturn(ABORT);
+ lock.acquire();
+ }
+
+ @Test(expected = LockException.class)
+ public void testAcquireLockWithWaitRetriesExceeded() throws Exception {
+ when(mockLockResponse.getState()).thenReturn(WAITING, WAITING, WAITING);
+ lock.acquire();
+ }
+
+ @Test
+ public void testAcquireLockWithWaitRetries() throws Exception {
+ when(mockLockResponse.getState()).thenReturn(WAITING, WAITING, ACQUIRED);
+ lock.acquire();
+ assertEquals(Long.valueOf(LOCK_ID), lock.getLockId());
+ }
+
+ @Test
+ public void testReleaseLock() throws Exception {
+ lock.acquire();
+ lock.release();
+ verify(mockMetaStoreClient).unlock(LOCK_ID);
+ }
+
+ @Test
+ public void testReleaseLockNoLock() throws Exception {
+ lock.release();
+ verifyNoMoreInteractions(mockMetaStoreClient);
+ }
+
+ @Test
+ public void testReleaseLockCancelsHeartbeat() throws Exception {
+ lock.acquire();
+ lock.release();
+ verify(mockHeartbeat).cancel();
+ }
+
+ @Test
+ public void testReadHeartbeat() throws Exception {
+ HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, null, TABLES, LOCK_ID);
+ task.run();
+ verify(mockMetaStoreClient).heartbeat(0, LOCK_ID);
+ }
+
+ @Test
+ public void testTxnHeartbeat() throws Exception {
+ HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+ task.run();
+ verify(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
+ }
+
+ @Test
+ public void testReadHeartbeatFailsNoSuchLockException() throws Exception {
+ Throwable t = new NoSuchLockException();
+ doThrow(t).when(mockMetaStoreClient).heartbeat(0, LOCK_ID);
+ HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, null, TABLES, LOCK_ID);
+ task.run();
+ verify(mockListener).lockFailed(LOCK_ID, null, Lock.asStrings(TABLES), t);
+ }
+
+ @Test
+ public void testTxnHeartbeatFailsNoSuchLockException() throws Exception {
+ Throwable t = new NoSuchLockException();
+ doThrow(t).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
+ HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+ task.run();
+ verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(TABLES), t);
+ }
+
+ @Test
+ public void testHeartbeatFailsNoSuchTxnException() throws Exception {
+ Throwable t = new NoSuchTxnException();
+ doThrow(t).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
+ HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+ task.run();
+ verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(TABLES), t);
+ }
+
+ @Test
+ public void testHeartbeatFailsTxnAbortedException() throws Exception {
+ Throwable t = new TxnAbortedException();
+ doThrow(t).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
+ HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+ task.run();
+ verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(TABLES), t);
+ }
+
+ @Test
+ public void testHeartbeatContinuesTException() throws Exception {
+ Throwable t = new TException();
+ doThrow(t).when(mockMetaStoreClient).heartbeat(0, LOCK_ID);
+ HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+ task.run();
+ verifyZeroInteractions(mockListener);
+ }
+
+ private static Table createTable(String databaseName, String tableName) {
+ Table table = new Table();
+ table.setDbName(databaseName);
+ table.setTableName(tableName);
+ return table;
+ }
+
+}
diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestAbstractBucketIdAppender.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestAbstractBucketIdAppender.java
new file mode 100644
index 0000000..88d56a3
--- /dev/null
+++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestAbstractBucketIdAppender.java
@@ -0,0 +1,38 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hive.hcatalog.streaming.mutate.MutableRecord;
+import org.junit.Test;
+
+public class TestAbstractBucketIdAppender {
+
+ private static final int TOTAL_BUCKETS = 12;
+ private static final int RECORD_ID_COLUMN = 2;
+ // id - TODO: use a non-zero index to check for offset errors.
+ private static final int[] BUCKET_COLUMN_INDEXES = new int[] { 0 };
+
+ private BucketIdAppender capturingBucketIdAppender = new BucketIdAppenderImpl(
+ ObjectInspectorFactory.getReflectionObjectInspector(MutableRecord.class,
+ ObjectInspectorFactory.ObjectInspectorOptions.JAVA), RECORD_ID_COLUMN, TOTAL_BUCKETS, BUCKET_COLUMN_INDEXES);
+
+ @Test
+ public void testAttachBucketIdToRecord() {
+ MutableRecord record = new MutableRecord(1, "hello");
+ capturingBucketIdAppender.attachBucketIdToRecord(record);
+ assertThat(record.rowId, is(new RecordIdentifier(-1L, 8, -1L)));
+ assertThat(record.id, is(1));
+ assertThat(record.msg.toString(), is("hello"));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNoBucketColumns() {
+ new BucketIdAppenderImpl(ObjectInspectorFactory.getReflectionObjectInspector(MutableRecord.class,
+ ObjectInspectorFactory.ObjectInspectorOptions.JAVA), RECORD_ID_COLUMN, TOTAL_BUCKETS, new int[0]);
+
+ }
+
+}
diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestAbstractMutator.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestAbstractMutator.java
new file mode 100644
index 0000000..5c0a9f8
--- /dev/null
+++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestAbstractMutator.java
@@ -0,0 +1,114 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestAbstractMutator {
+
+ private static final Object RECORD = new Object();
+ private static final int RECORD_ID_COLUMN = 2;
+ private static final int BUCKET_ID = 0;
+ private static final Path PATH = new Path("X");
+ private static final long TRANSACTION_ID = 1L;
+
+ @Mock
+ private AcidOutputFormat, ?> mockOutputFormat;
+ @Mock
+ private ObjectInspector mockObjectInspector;
+ @Mock
+ private RecordUpdater mockRecordUpdater;
+ @Captor
+ private ArgumentCaptor captureOptions;
+
+ private HiveConf configuration = new HiveConf();
+
+ private AbstractMutator createStub() throws IOException {
+ return new AbstractMutator(configuration, mockOutputFormat, TRANSACTION_ID, PATH, BUCKET_ID) {
+
+ @Override
+ protected int getRecordIdColumn() {
+ return RECORD_ID_COLUMN;
+ }
+
+ @Override
+ protected ObjectInspector getObjectInspector() {
+ return mockObjectInspector;
+ }
+ };
+ }
+
+ @Test
+ public void testCreatesRecordReader() throws IOException {
+ AbstractMutator createStub = createStub();
+ verify(mockOutputFormat).getRecordUpdater(eq(PATH), captureOptions.capture());
+ Options options = captureOptions.getValue();
+ assertThat(options.getBucket(), is(BUCKET_ID));
+ assertThat(options.getConfiguration(), is((Configuration) configuration));
+ assertThat(options.getInspector(), is(mockObjectInspector));
+ assertThat(options.getRecordIdColumn(), is(RECORD_ID_COLUMN));
+ assertThat(options.getMinimumTransactionId(), is(TRANSACTION_ID));
+ assertThat(options.getMaximumTransactionId(), is(TRANSACTION_ID));
+ }
+
+ @Test
+ public void testInsertDelegates() throws IOException {
+ when(mockOutputFormat.getRecordUpdater(eq(PATH), any(Options.class))).thenReturn(mockRecordUpdater);
+ AbstractMutator stub = createStub();
+ stub.insert(RECORD);
+ verify(mockRecordUpdater).insert(TRANSACTION_ID, RECORD);
+ }
+
+ @Test
+ public void testUpdateDelegates() throws IOException {
+ when(mockOutputFormat.getRecordUpdater(eq(PATH), any(Options.class))).thenReturn(mockRecordUpdater);
+ AbstractMutator stub = createStub();
+ stub.update(RECORD);
+ verify(mockRecordUpdater).update(TRANSACTION_ID, RECORD);
+ }
+
+ @Test
+ public void testDeleteDelegates() throws IOException {
+ when(mockOutputFormat.getRecordUpdater(eq(PATH), any(Options.class))).thenReturn(mockRecordUpdater);
+ AbstractMutator stub = createStub();
+ stub.delete(RECORD);
+ verify(mockRecordUpdater).delete(TRANSACTION_ID, RECORD);
+ }
+
+ @Test
+ public void testCloseDelegates() throws IOException {
+ when(mockOutputFormat.getRecordUpdater(eq(PATH), any(Options.class))).thenReturn(mockRecordUpdater);
+ AbstractMutator stub = createStub();
+ stub.close();
+ verify(mockRecordUpdater).close(false);
+ }
+
+ @Test
+ public void testFlushDelegates() throws IOException {
+ when(mockOutputFormat.getRecordUpdater(eq(PATH), any(Options.class))).thenReturn(mockRecordUpdater);
+ AbstractMutator stub = createStub();
+ stub.flush();
+ verify(mockRecordUpdater).flush();
+ }
+
+}
diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java
new file mode 100644
index 0000000..14251ef
--- /dev/null
+++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java
@@ -0,0 +1,183 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hive.hcatalog.streaming.mutate.client.MutatorDestination;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMutatorCoordinator {
+
+ private static final List UNPARTITIONED = Collections. emptyList();
+ private static final List PARTITION_B = Arrays.asList("B");
+ private static final List PARTITION_A = Arrays.asList("A");
+ private static final long TRANSACTION_ID = 2L;
+ private static final int BUCKET_ID = 0;
+ private static final Path PATH_A = new Path("X");
+ private static final Path PATH_B = new Path("B");
+ private static final Object RECORD = "RECORD";
+ private static final RecordIdentifier ROW__ID_B0_R0 = new RecordIdentifier(10L, BUCKET_ID, 0L);
+ private static final RecordIdentifier ROW__ID_B0_R1 = new RecordIdentifier(10L, BUCKET_ID, 1L);
+ private static final RecordIdentifier ROW__ID_B1_R0 = new RecordIdentifier(10L, BUCKET_ID + 1, 0L);
+ private static final RecordIdentifier ROW__ID_INSERT = new RecordIdentifier(-1L, BUCKET_ID, -1L);
+
+ @Mock
+ private IMetaStoreClient mockMetaStoreClient;
+ @Mock
+ private MutatorFactory mockMutatorFactory;
+ @Mock
+ private CreatePartitionHelper mockPartitionHelper;
+ @Mock
+ private SequenceValidator mockSequenceValidator;
+ @Mock
+ private MutatorDestination mockDestination;
+ @Mock
+ private RecordInspector mockRecordInspector;
+ @Mock
+ private Mutator mockMutator;
+
+ private MutatorCoordinator coordinator;
+
+ private HiveConf configuration = new HiveConf();
+
+ @Before
+ public void createCoordinator() throws Exception {
+ when(mockDestination.getOutputFormatName()).thenReturn(OrcOutputFormat.class.getName());
+ when(mockDestination.getTotalBuckets()).thenReturn(1);
+ when(mockDestination.getTransactionId()).thenReturn(TRANSACTION_ID);
+ when(mockDestination.createPartitions()).thenReturn(true);
+ when(mockMutatorFactory.newRecordInspector()).thenReturn(mockRecordInspector);
+ when(mockMutatorFactory.newMutator(any(OrcOutputFormat.class), anyLong(), any(Path.class), anyInt())).thenReturn(
+ mockMutator);
+ when(mockPartitionHelper.getPathForPartition(any(List.class))).thenReturn(PATH_A);
+ when(mockRecordInspector.extractRecordIdentifier(RECORD)).thenReturn(ROW__ID_INSERT);
+
+ coordinator = new MutatorCoordinator(mockMetaStoreClient, configuration, mockMutatorFactory, mockPartitionHelper,
+ mockSequenceValidator, mockDestination);
+ }
+
+ @Test
+ public void insert() throws Exception {
+ coordinator.insert(UNPARTITIONED, RECORD);
+
+ verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
+ verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+ verify(mockMutator).insert(RECORD);
+ }
+
+ @Test
+ public void multipleInserts() throws Exception {
+ coordinator.insert(UNPARTITIONED, RECORD);
+ coordinator.insert(UNPARTITIONED, RECORD);
+ coordinator.insert(UNPARTITIONED, RECORD);
+
+ verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
+ verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+ verify(mockMutator, times(3)).insert(RECORD);
+ }
+
+ @Test
+ public void insertPartitionChanges() throws Exception {
+ when(mockPartitionHelper.getPathForPartition(PARTITION_A)).thenReturn(PATH_A);
+ when(mockPartitionHelper.getPathForPartition(PARTITION_B)).thenReturn(PATH_B);
+
+ coordinator.insert(PARTITION_A, RECORD);
+ coordinator.insert(PARTITION_B, RECORD);
+
+ verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_A);
+ verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_B);
+ verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+ verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_B), eq(BUCKET_ID));
+ verify(mockMutator, times(2)).insert(RECORD);
+ }
+
+ @Test
+ public void bucketChanges() throws Exception {
+ when(mockRecordInspector.extractRecordIdentifier(RECORD)).thenReturn(ROW__ID_B0_R0, ROW__ID_B1_R0);
+
+ coordinator.update(UNPARTITIONED, RECORD);
+ coordinator.delete(UNPARTITIONED, RECORD);
+
+ verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
+ verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+ verify(mockMutatorFactory)
+ .newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID + 1));
+ verify(mockMutator).update(RECORD);
+ verify(mockMutator).delete(RECORD);
+ }
+
+ @Test
+ public void partitionThenBucketChanges() throws Exception {
+ when(mockRecordInspector.extractRecordIdentifier(RECORD)).thenReturn(ROW__ID_B0_R0, ROW__ID_B0_R1, ROW__ID_B1_R0,
+ ROW__ID_INSERT);
+
+ when(mockPartitionHelper.getPathForPartition(PARTITION_A)).thenReturn(PATH_A);
+ when(mockPartitionHelper.getPathForPartition(PARTITION_B)).thenReturn(PATH_B);
+
+ coordinator.update(PARTITION_A, RECORD);
+ coordinator.delete(PARTITION_B, RECORD);
+ coordinator.update(PARTITION_B, RECORD);
+ coordinator.insert(PARTITION_B, RECORD);
+
+ verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_A);
+ verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_B);
+ verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+ verify(mockMutatorFactory, times(2)).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_B),
+ eq(BUCKET_ID));
+ verify(mockMutatorFactory)
+ .newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_B), eq(BUCKET_ID + 1));
+ verify(mockMutator, times(2)).update(RECORD);
+ verify(mockMutator).delete(RECORD);
+ verify(mockMutator).insert(RECORD);
+ verify(mockSequenceValidator, times(4)).reset();
+ }
+
+ @Test(expected = MutatorException.class)
+ public void outOfSequence() throws Exception {
+ when(mockSequenceValidator.isInSequence(any(RecordIdentifier.class))).thenReturn(false);
+
+ coordinator.update(UNPARTITIONED, RECORD);
+ coordinator.delete(UNPARTITIONED, RECORD);
+
+ verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
+ verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+ verify(mockMutator).update(RECORD);
+ verify(mockMutator).delete(RECORD);
+ }
+
+ @Test
+ public void closeNoRecords() throws Exception {
+ coordinator.close();
+
+ // No mutator created
+ verifyZeroInteractions(mockMutator);
+ }
+
+ @Test
+ public void closeUsedCoordinator() throws Exception {
+ coordinator.insert(UNPARTITIONED, RECORD);
+ coordinator.close();
+
+ verify(mockMutator).close();
+ }
+}
diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestRecordInspectorImpl.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestRecordInspectorImpl.java
new file mode 100644
index 0000000..389ad33
--- /dev/null
+++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestRecordInspectorImpl.java
@@ -0,0 +1,31 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hive.hcatalog.streaming.mutate.MutableRecord;
+import org.junit.Test;
+
+public class TestRecordInspectorImpl {
+
+ private static final int ROW_ID_COLUMN = 2;
+
+ private RecordInspectorImpl inspector = new RecordInspectorImpl(ObjectInspectorFactory.getReflectionObjectInspector(
+ MutableRecord.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA), ROW_ID_COLUMN);
+
+ @Test
+ public void testExtractRecordIdentifier() {
+ RecordIdentifier recordIdentifier = new RecordIdentifier(10L, 4, 20L);
+ MutableRecord record = new MutableRecord(1, "hello", recordIdentifier);
+ assertThat(inspector.extractRecordIdentifier(record), is(recordIdentifier));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNotAStructObjectInspector() {
+ new RecordInspectorImpl(PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, 2);
+ }
+
+}
diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestSequenceValidator.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestSequenceValidator.java
new file mode 100644
index 0000000..33f9606
--- /dev/null
+++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestSequenceValidator.java
@@ -0,0 +1,91 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.junit.Test;
+
+public class TestSequenceValidator {
+
+ private static final int BUCKET_ID = 1;
+
+ private SequenceValidator validator = new SequenceValidator();
+
+ @Test
+ public void testSingleInSequence() {
+ assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true));
+ }
+
+ @Test
+ public void testRowIdInSequence() {
+ assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true));
+ assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 1)), is(true));
+ assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 4)), is(true));
+ }
+
+ @Test
+ public void testTxIdInSequence() {
+ assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true));
+ assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 0)), is(true));
+ assertThat(validator.isInSequence(new RecordIdentifier(4L, BUCKET_ID, 0)), is(true));
+ }
+
+ @Test
+ public void testMixedInSequence() {
+ assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true));
+ assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 1)), is(true));
+ assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 0)), is(true));
+ assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 1)), is(true));
+ }
+
+ @Test
+ public void testNegativeTxId() {
+ assertThat(validator.isInSequence(new RecordIdentifier(-1L, BUCKET_ID, 0)), is(true));
+ assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true));
+ }
+
+ @Test
+ public void testNegativeRowId() {
+ assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, -1)), is(true));
+ assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true));
+ }
+
+ @Test
+ public void testRowIdOutOfSequence() {
+ assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true));
+ assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 4)), is(true));
+ assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 1)), is(false));
+ }
+
+ @Test
+ public void testReset() {
+ assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true));
+ assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 4)), is(true));
+ // New partition for example
+ validator.reset();
+ assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 1)), is(true));
+ }
+
+ @Test
+ public void testTxIdOutOfSequence() {
+ assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true));
+ assertThat(validator.isInSequence(new RecordIdentifier(4L, BUCKET_ID, 0)), is(true));
+ assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 0)), is(false));
+ }
+
+ @Test
+ public void testMixedOutOfSequence() {
+ assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true));
+ assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 4)), is(true));
+ assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 0)), is(false));
+ assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 5)), is(true));
+ assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 6)), is(false));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testNullRecordIdentifier() {
+ validator.isInSequence(null);
+ }
+
+}