commit 6a47a1f2fd01fc6a1cacdacfd077fe79268cdd3f Author: David Lavati Date: Fri Sep 27 12:34:58 2019 +0200 HIVE-21146 Enforce TransactionBatch size=1 for blob stores Change-Id: Ia5f94c34a044c2990e95204de03b661d162874c7 diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index f4e71f915b..31223ba766 100644 --- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -32,8 +32,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.hive.common.BlobStorageUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; import org.apache.hadoop.hive.metastore.IMetaStoreClient; @@ -520,6 +523,28 @@ private void validateTable() throws InvalidTable, ConnectionError { LOG.error(errMsg); throw new ConnectionError(errMsg); } + + // batch size is only used for managed transactions, not for unmanaged single transactions + if (transactionBatchSize > 1) { + try (FileSystem fs = tableObject.getDataLocation().getFileSystem(conf)) { + if (BlobStorageUtils.isBlobStorageFileSystem(conf, fs)) { + // currently not all filesystems implement StreamCapabilities, while FSDataOutputStream does + Path path = new Path("/tmp", "_tmp_stream_verify_" + UUID.randomUUID().toString()); + try(FSDataOutputStream out = fs.create(path, false)){ + if (!out.hasCapability(StreamCapabilities.HFLUSH)) { + throw new ConnectionError( + "The backing filesystem only supports transaction batch sizes of 1, but " + transactionBatchSize + + " was requested."); + } + fs.deleteOnExit(path); + } catch (IOException e){ + throw new ConnectionError("Could not create path for database", e); + } + } + } catch (IOException e) { + throw new ConnectionError("Could not retrieve FileSystem of table", e); + } + } } private void beginNextTransaction() throws StreamingException { diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java index 055672f910..58b3ae2bd4 100644 --- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java +++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java @@ -114,7 +114,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class TestStreaming { private static final Logger LOG = LoggerFactory.getLogger(TestStreaming.class); @@ -1314,6 +1313,35 @@ public void testTransactionBatchEmptyCommit() throws Exception { connection.close(); } + @Test + public void testTransactionBatchSizeValidation() throws Exception { + final String schemes = conf.get(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES.varname); + // the output stream of this FS doesn't support hflush, so the below test will fail + conf.setVar(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES, "raw"); + + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + + try { + HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withTransactionBatchSize(2) + .withHiveConf(conf) + .connect(); + + Assert.fail(); + } catch (ConnectionError e) { + Assert.assertTrue("Expected connection error due to batch sizes", + e.getMessage().contains("only supports transaction batch")); + } finally { + conf.setVar(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES, schemes); + } + } + /** * check that transactions that have not heartbeated and timedout get properly aborted *