diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java
index 2724525..29b828d 100644
--- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java
+++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java
@@ -42,7 +42,16 @@
.lockFailureListener(lockFailureListener == null ? LockFailureListener.NULL_LISTENER : lockFailureListener)
.user(user);
for (AcidTable table : tables) {
- lockOptions.addTable(table.getDatabaseName(), table.getTableName());
+ switch (table.getTableType()) {
+ case SOURCE:
+ lockOptions.addSourceTable(table.getDatabaseName(), table.getTableName());
+ break;
+ case SINK:
+ lockOptions.addSinkTable(table.getDatabaseName(), table.getTableName());
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown TableType: " + table.getTableType());
+ }
}
}
diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java
index 21604df..ad0b303 100644
--- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java
+++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java
@@ -2,6 +2,7 @@
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
@@ -35,7 +36,8 @@
private final IMetaStoreClient metaStoreClient;
private final HeartbeatFactory heartbeatFactory;
private final LockFailureListener listener;
- private final Collection
tableDescriptors;
+ private final Collection sinks;
+ private final Collection tables = new HashSet<>();
private final int lockRetries;
private final int retryWaitSeconds;
private final String user;
@@ -46,23 +48,26 @@
private Long transactionId;
public Lock(IMetaStoreClient metaStoreClient, Options options) {
- this(metaStoreClient, new HeartbeatFactory(), options.hiveConf, options.listener, options.user,
- options.descriptors, options.lockRetries, options.retryWaitSeconds);
+ this(metaStoreClient, new HeartbeatFactory(), options.hiveConf, options.listener, options.user, options.sources,
+ options.sinks, options.lockRetries, options.retryWaitSeconds);
}
/** Visible for testing only. */
Lock(IMetaStoreClient metaStoreClient, HeartbeatFactory heartbeatFactory, HiveConf hiveConf,
- LockFailureListener listener, String user, Collection tableDescriptors, int lockRetries,
+ LockFailureListener listener, String user, Collection sources, Collection sinks, int lockRetries,
int retryWaitSeconds) {
this.metaStoreClient = metaStoreClient;
this.heartbeatFactory = heartbeatFactory;
this.hiveConf = hiveConf;
this.user = user;
- this.tableDescriptors = tableDescriptors;
this.listener = listener;
this.lockRetries = lockRetries;
this.retryWaitSeconds = retryWaitSeconds;
+ this.sinks = sinks;
+ tables.addAll(sources);
+ tables.addAll(sinks);
+
if (LockFailureListener.NULL_LISTENER.equals(listener)) {
LOG.warn("No {} supplied. Data quality and availability cannot be assured.",
LockFailureListener.class.getSimpleName());
@@ -77,6 +82,9 @@ public void acquire() throws LockException {
/** Attempts to acquire a read lock on the table, returns if successful, throws exception otherwise. */
public void acquire(long transactionId) throws LockException {
+ if (transactionId <= 0) {
+ throw new IllegalArgumentException("Invalid transaction id: " + transactionId);
+ }
lockId = internalAcquire(transactionId);
this.transactionId = transactionId;
initiateHeartbeat();
@@ -96,19 +104,18 @@ public String getUser() {
@Override
public String toString() {
- return "Lock [metaStoreClient=" + metaStoreClient + ", lockId=" + lockId + ", transactionId=" + transactionId
- + "]";
+ return "Lock [metaStoreClient=" + metaStoreClient + ", lockId=" + lockId + ", transactionId=" + transactionId + "]";
}
private long internalAcquire(Long transactionId) throws LockException {
int attempts = 0;
- LockRequest request = buildSharedLockRequest(transactionId);
+ LockRequest request = buildLockRequest(transactionId);
do {
LockResponse response = null;
try {
response = metaStoreClient.lock(request);
} catch (TException e) {
- throw new LockException("Unable to acquire lock for tables: [" + join(tableDescriptors) + "]", e);
+ throw new LockException("Unable to acquire lock for tables: [" + join(tables) + "]", e);
}
if (response != null) {
LockState state = response.getState();
@@ -129,7 +136,7 @@ private long internalAcquire(Long transactionId) throws LockException {
}
attempts++;
} while (attempts < lockRetries);
- throw new LockException("Could not acquire lock on tables: [" + join(tableDescriptors) + "]");
+ throw new LockException("Could not acquire lock on tables: [" + join(tables) + "]");
}
private void internalRelease() {
@@ -142,18 +149,24 @@ private void internalRelease() {
}
} catch (TException e) {
LOG.error("Lock " + lockId + " failed.", e);
- listener.lockFailed(lockId, transactionId, asStrings(tableDescriptors), e);
+ listener.lockFailed(lockId, transactionId, asStrings(tables), e);
}
}
- private LockRequest buildSharedLockRequest(Long transactionId) {
+ private LockRequest buildLockRequest(Long transactionId) {
+ if (transactionId == null && !sinks.isEmpty()) {
+ throw new IllegalArgumentException("Cannot sink to tables outside of a transaction: sinks=" + asStrings(sinks));
+ }
LockRequestBuilder requestBuilder = new LockRequestBuilder();
- for (Table descriptor : tableDescriptors) {
- LockComponent component = new LockComponentBuilder()
- .setDbName(descriptor.getDbName())
- .setTableName(descriptor.getTableName())
- .setShared()
- .build();
+ for (Table table : tables) {
+ LockComponentBuilder componentBuilder = new LockComponentBuilder().setDbName(table.getDbName()).setTableName(
+ table.getTableName());
+ if (sinks.contains(table)) {
+ componentBuilder.setSemiShared();
+ } else {
+ componentBuilder.setShared();
+ }
+ LockComponent component = componentBuilder.build();
requestBuilder.addLockComponent(component);
}
if (transactionId != null) {
@@ -166,8 +179,7 @@ private LockRequest buildSharedLockRequest(Long transactionId) {
private void initiateHeartbeat() {
int heartbeatPeriod = getHeartbeatPeriod();
LOG.debug("Heartbeat period {}s", heartbeatPeriod);
- heartbeat = heartbeatFactory.newInstance(metaStoreClient, listener, transactionId, tableDescriptors, lockId,
- heartbeatPeriod);
+ heartbeat = heartbeatFactory.newInstance(metaStoreClient, listener, transactionId, tables, lockId, heartbeatPeriod);
}
private int getHeartbeatPeriod() {
@@ -210,22 +222,33 @@ static String join(Iterable extends Object> values) {
/** Constructs a lock options for a set of Hive ACID tables from which we wish to read. */
public static final class Options {
- Set descriptors = new LinkedHashSet<>();
+ Set sources = new LinkedHashSet<>();
+ Set sinks = new LinkedHashSet<>();
LockFailureListener listener = LockFailureListener.NULL_LISTENER;
int lockRetries = 5;
int retryWaitSeconds = 30;
String user;
HiveConf hiveConf;
- /** Adds a table for which a shared read lock will be requested. */
- public Options addTable(String databaseName, String tableName) {
+ /** Adds a table for which a shared lock will be requested. */
+ public Options addSourceTable(String databaseName, String tableName) {
+ addTable(databaseName, tableName, sources);
+ return this;
+ }
+
+ /** Adds a table for which a semi-shared lock will be requested. */
+ public Options addSinkTable(String databaseName, String tableName) {
+ addTable(databaseName, tableName, sinks);
+ return this;
+ }
+
+ private void addTable(String databaseName, String tableName, Set tables) {
checkNotNullOrEmpty(databaseName);
checkNotNullOrEmpty(tableName);
Table table = new Table();
table.setDbName(databaseName);
table.setTableName(tableName);
- descriptors.add(table);
- return this;
+ tables.add(table);
}
public Options user(String user) {
diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
index 9fc10b6..09a55b6 100644
--- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
+++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
@@ -421,7 +421,7 @@
admin to pre-create the necessary partitions may not be reasonable.
Consequently the API allows coordinators to create partitions as needed
(see:
-MutatorClientBuilder.addTable(String, String, boolean)
+MutatorClientBuilder.addSinkTable(String, String, boolean)
). Partition creation being an atomic action, multiple coordinators can
race to create the partition, but only one would succeed, so
coordinators clients need not synchronize when creating a partition. The
@@ -440,14 +440,14 @@
- Obtaining a valid transaction list from the meta store (
ValidTxnList).
-- Acquiring a read-lock with the meta store and issuing
-heartbeats (
LockImpl can help with this).
+ - Acquiring a lock with the meta store and issuing heartbeats (
LockImpl
+can help with this).
- Configuring the
OrcInputFormat and then reading
the data. Make sure that you also pull in the ROW__ID
values. See: AcidRecordReader.getRecordIdentifier.
-- Releasing the read-lock.
+- Releasing the lock.
diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java
index ef1e80c..05f342b 100644
--- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java
+++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java
@@ -19,7 +19,9 @@
import java.net.InetAddress;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.Timer;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -42,14 +44,17 @@
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
@RunWith(MockitoJUnitRunner.class)
public class TestLock {
- private static final Table TABLE_1 = createTable("DB", "ONE");
- private static final Table TABLE_2 = createTable("DB", "TWO");
- private static final List TABLES = ImmutableList.of(TABLE_1, TABLE_2);
+ private static final Table SOURCE_TABLE_1 = createTable("DB", "SOURCE_1");
+ private static final Table SOURCE_TABLE_2 = createTable("DB", "SOURCE_2");
+ private static final Table SINK_TABLE = createTable("DB", "SINK");
+ private static final Set SOURCES = ImmutableSet.of(SOURCE_TABLE_1, SOURCE_TABLE_2);
+ private static final Set SINKS = ImmutableSet.of(SINK_TABLE);
+ private static final Set TABLES = ImmutableSet.of(SOURCE_TABLE_1, SOURCE_TABLE_2, SINK_TABLE);
private static final long LOCK_ID = 42;
private static final long TRANSACTION_ID = 109;
private static final String USER = "ewest";
@@ -67,7 +72,8 @@
@Captor
private ArgumentCaptor requestCaptor;
- private Lock lock;
+ private Lock readLock;
+ private Lock writeLock;
private HiveConf configuration = new HiveConf();
@Before
@@ -79,44 +85,57 @@ public void injectMocks() throws Exception {
mockHeartbeatFactory.newInstance(any(IMetaStoreClient.class), any(LockFailureListener.class), any(Long.class),
any(Collection.class), anyLong(), anyInt())).thenReturn(mockHeartbeat);
- lock = new Lock(mockMetaStoreClient, mockHeartbeatFactory, configuration, mockListener, USER, TABLES, 3, 0);
+ readLock = new Lock(mockMetaStoreClient, mockHeartbeatFactory, configuration, mockListener, USER, SOURCES,
+ Collections. emptySet(), 3, 0);
+ writeLock = new Lock(mockMetaStoreClient, mockHeartbeatFactory, configuration, mockListener, USER, SOURCES, SINKS,
+ 3, 0);
}
@Test
public void testAcquireReadLockWithNoIssues() throws Exception {
- lock.acquire();
- assertEquals(Long.valueOf(LOCK_ID), lock.getLockId());
- assertNull(lock.getTransactionId());
+ readLock.acquire();
+ assertEquals(Long.valueOf(LOCK_ID), readLock.getLockId());
+ assertNull(readLock.getTransactionId());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testAcquireWriteLockWithoutTxn() throws Exception {
+ writeLock.acquire();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testAcquireWriteLockWithInvalidTxn() throws Exception {
+ writeLock.acquire(0);
}
@Test
public void testAcquireTxnLockWithNoIssues() throws Exception {
- lock.acquire(TRANSACTION_ID);
- assertEquals(Long.valueOf(LOCK_ID), lock.getLockId());
- assertEquals(Long.valueOf(TRANSACTION_ID), lock.getTransactionId());
+ writeLock.acquire(TRANSACTION_ID);
+ assertEquals(Long.valueOf(LOCK_ID), writeLock.getLockId());
+ assertEquals(Long.valueOf(TRANSACTION_ID), writeLock.getTransactionId());
}
@Test
public void testAcquireReadLockCheckHeartbeatCreated() throws Exception {
configuration.set("hive.txn.timeout", "100s");
- lock.acquire();
+ readLock.acquire();
- verify(mockHeartbeatFactory).newInstance(eq(mockMetaStoreClient), eq(mockListener), any(Long.class), eq(TABLES),
+ verify(mockHeartbeatFactory).newInstance(eq(mockMetaStoreClient), eq(mockListener), any(Long.class), eq(SOURCES),
eq(LOCK_ID), eq(75));
}
@Test
public void testAcquireTxnLockCheckHeartbeatCreated() throws Exception {
configuration.set("hive.txn.timeout", "100s");
- lock.acquire(TRANSACTION_ID);
+ writeLock.acquire(TRANSACTION_ID);
- verify(mockHeartbeatFactory).newInstance(eq(mockMetaStoreClient), eq(mockListener), eq(TRANSACTION_ID), eq(TABLES),
- eq(LOCK_ID), eq(75));
+ verify(mockHeartbeatFactory).newInstance(eq(mockMetaStoreClient), eq(mockListener), eq(TRANSACTION_ID),
+ eq(TABLES), eq(LOCK_ID), eq(75));
}
@Test
public void testAcquireLockCheckUser() throws Exception {
- lock.acquire();
+ readLock.acquire();
verify(mockMetaStoreClient).lock(requestCaptor.capture());
LockRequest actualRequest = requestCaptor.getValue();
assertEquals(USER, actualRequest.getUser());
@@ -124,7 +143,7 @@ public void testAcquireLockCheckUser() throws Exception {
@Test
public void testAcquireReadLockCheckLocks() throws Exception {
- lock.acquire();
+ readLock.acquire();
verify(mockMetaStoreClient).lock(requestCaptor.capture());
LockRequest request = requestCaptor.getValue();
@@ -137,17 +156,17 @@ public void testAcquireReadLockCheckLocks() throws Exception {
assertEquals(2, components.size());
LockComponent expected1 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB");
- expected1.setTablename("ONE");
+ expected1.setTablename("SOURCE_1");
assertTrue(components.contains(expected1));
LockComponent expected2 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB");
- expected2.setTablename("TWO");
+ expected2.setTablename("SOURCE_2");
assertTrue(components.contains(expected2));
}
@Test
public void testAcquireTxnLockCheckLocks() throws Exception {
- lock.acquire(TRANSACTION_ID);
+ writeLock.acquire(TRANSACTION_ID);
verify(mockMetaStoreClient).lock(requestCaptor.capture());
LockRequest request = requestCaptor.getValue();
@@ -157,73 +176,77 @@ public void testAcquireTxnLockCheckLocks() throws Exception {
List components = request.getComponent();
- System.out.println(components);
- assertEquals(2, components.size());
+ assertEquals(3, components.size());
LockComponent expected1 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB");
- expected1.setTablename("ONE");
+ expected1.setTablename("SOURCE_1");
assertTrue(components.contains(expected1));
LockComponent expected2 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB");
- expected2.setTablename("TWO");
+ expected2.setTablename("SOURCE_2");
assertTrue(components.contains(expected2));
+
+ LockComponent expected3 = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "DB");
+ expected3.setTablename("SINK");
+ assertTrue(components.contains(expected3));
}
@Test(expected = LockException.class)
public void testAcquireLockNotAcquired() throws Exception {
when(mockLockResponse.getState()).thenReturn(NOT_ACQUIRED);
- lock.acquire();
+ readLock.acquire();
}
@Test(expected = LockException.class)
public void testAcquireLockAborted() throws Exception {
when(mockLockResponse.getState()).thenReturn(ABORT);
- lock.acquire();
+ readLock.acquire();
}
@Test(expected = LockException.class)
public void testAcquireLockWithWaitRetriesExceeded() throws Exception {
when(mockLockResponse.getState()).thenReturn(WAITING, WAITING, WAITING);
- lock.acquire();
+ readLock.acquire();
}
@Test
public void testAcquireLockWithWaitRetries() throws Exception {
when(mockLockResponse.getState()).thenReturn(WAITING, WAITING, ACQUIRED);
- lock.acquire();
- assertEquals(Long.valueOf(LOCK_ID), lock.getLockId());
+ readLock.acquire();
+ assertEquals(Long.valueOf(LOCK_ID), readLock.getLockId());
}
@Test
public void testReleaseLock() throws Exception {
- lock.acquire();
- lock.release();
+ readLock.acquire();
+ readLock.release();
verify(mockMetaStoreClient).unlock(LOCK_ID);
}
@Test
public void testReleaseLockNoLock() throws Exception {
- lock.release();
+ readLock.release();
verifyNoMoreInteractions(mockMetaStoreClient);
}
@Test
public void testReleaseLockCancelsHeartbeat() throws Exception {
- lock.acquire();
- lock.release();
+ readLock.acquire();
+ readLock.release();
verify(mockHeartbeat).cancel();
}
@Test
public void testReadHeartbeat() throws Exception {
- HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, null, TABLES, LOCK_ID);
+ HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, null, SOURCES, LOCK_ID);
task.run();
verify(mockMetaStoreClient).heartbeat(0, LOCK_ID);
}
@Test
public void testTxnHeartbeat() throws Exception {
- HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+ HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, SOURCES,
+ LOCK_ID);
task.run();
verify(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
}
@@ -232,43 +255,47 @@ public void testTxnHeartbeat() throws Exception {
public void testReadHeartbeatFailsNoSuchLockException() throws Exception {
Throwable t = new NoSuchLockException();
doThrow(t).when(mockMetaStoreClient).heartbeat(0, LOCK_ID);
- HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, null, TABLES, LOCK_ID);
+ HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, null, SOURCES, LOCK_ID);
task.run();
- verify(mockListener).lockFailed(LOCK_ID, null, Lock.asStrings(TABLES), t);
+ verify(mockListener).lockFailed(LOCK_ID, null, Lock.asStrings(SOURCES), t);
}
@Test
public void testTxnHeartbeatFailsNoSuchLockException() throws Exception {
Throwable t = new NoSuchLockException();
doThrow(t).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
- HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+ HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, SOURCES,
+ LOCK_ID);
task.run();
- verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(TABLES), t);
+ verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(SOURCES), t);
}
@Test
public void testHeartbeatFailsNoSuchTxnException() throws Exception {
Throwable t = new NoSuchTxnException();
doThrow(t).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
- HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+ HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, SOURCES,
+ LOCK_ID);
task.run();
- verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(TABLES), t);
+ verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(SOURCES), t);
}
@Test
public void testHeartbeatFailsTxnAbortedException() throws Exception {
Throwable t = new TxnAbortedException();
doThrow(t).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
- HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+ HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, SOURCES,
+ LOCK_ID);
task.run();
- verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(TABLES), t);
+ verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(SOURCES), t);
}
@Test
public void testHeartbeatContinuesTException() throws Exception {
Throwable t = new TException();
doThrow(t).when(mockMetaStoreClient).heartbeat(0, LOCK_ID);
- HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+ HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, SOURCES,
+ LOCK_ID);
task.run();
verifyZeroInteractions(mockListener);
}