commit 015b7b24fd0bbb632e108b5342fa2cd3b36db158 Author: David Lavati Date: Fri Sep 27 12:34:58 2019 +0200 HIVE-21146 Enforce TransactionBatch size=1 for blob stores Change-Id: Ia5f94c34a044c2990e95204de03b661d162874c7 diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index f4e71f915b..21734496e9 100644 --- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -32,8 +32,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; import org.apache.hadoop.hive.metastore.IMetaStoreClient; @@ -520,6 +522,25 @@ private void validateTable() throws InvalidTable, ConnectionError { LOG.error(errMsg); throw new ConnectionError(errMsg); } + + // batch size is only used for managed transactions, not for unmanaged single transactions + if (transactionBatchSize > 1) { + try (FileSystem fs = tableObject.getDataLocation().getFileSystem(conf)) { + Path path = new Path(tableObject.getDataLocation(), "tmp_stream_verify_" + UUID.randomUUID().toString()); + try(FSDataOutputStream out = fs.create(path, false)){ + if (!out.hasCapability(StreamCapabilities.HFLUSH)) { + throw new ConnectionError( + "The backing filesystem only supports transaction batch sizes of 1, but " + transactionBatchSize + + " was requested."); + } + fs.deleteOnExit(path); + } catch (IOException e){ + throw new ConnectionError("Could not create path for database", e); + } + } catch (IOException e) { + throw new ConnectionError("Could not retrieve FileSystem of table", e); + } + } } private void beginNextTransaction() throws StreamingException { diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java index dbff263aed..114056d4c6 100644 --- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java +++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java @@ -1314,6 +1314,38 @@ public void testTransactionBatchEmptyCommit() throws Exception { connection.close(); } + @Test + public void testTransactionBatchSizeValidation() throws Exception { + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + + HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withTransactionBatchSize(1) + .withHiveConf(conf) + .connect() + .close(); + + // The OutputStream provided by this FS doesn't implement Syncable, so it won't support Hflush + try { + HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withTransactionBatchSize(2) + .withHiveConf(conf) + .connect(); + } catch (ConnectionError e) { + Assert.assertTrue("Expected connection error due to batch sizes", + e.getMessage().contains("only supports transaction batch")); + } + } + /** * check that transactions that have not heartbeated and timedout get properly aborted *