diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOfHiveStreaming.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOfHiveStreaming.java new file mode 100644 index 0000000000..9a71c4b0f1 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOfHiveStreaming.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.hive.shims.Utils; +import org.apache.hive.streaming.HiveStreamingConnection; +import org.apache.hive.streaming.StrictDelimitedInputWriter; +import org.apache.hive.streaming.StreamingConnection; + +import org.junit.rules.TestName; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.BeforeClass; +import org.junit.AfterClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; + +/** + * TestReplicationOfHiveStreaming - test replication for streaming ingest on ACID tables + */ +public class TestReplicationOfHiveStreaming { + + @Rule + public final TestName testName = new TestName(); + + protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationOfHiveStreaming.class); + private static WarehouseInstance primary; + private static WarehouseInstance replica; + private static String primaryDbName; + private static String replicatedDbName; + + @BeforeClass + public static void classLevelSetup() throws Exception { + HashMap overrides = new HashMap<>(); + overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), + GzipJSONMessageEncoder.class.getCanonicalName()); + + internalBeforeClassSetup(overrides, TestReplicationOfHiveStreaming.class); + } + + static void internalBeforeClassSetup(Map overrides, + Class clazz) throws Exception { + + HiveConf conf = new HiveConf(clazz); + conf.set("dfs.client.use.datanode.hostname", "true"); + conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); + MiniDFSCluster miniDFSCluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + HashMap acidEnableConf = new HashMap() {{ + put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); + put("hive.support.concurrency", "true"); + put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); + put("hive.metastore.client.capability.check", "false"); + put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); + put("hive.exec.dynamic.partition.mode", "nonstrict"); + put("hive.strict.checks.bucketing", "false"); + put("hive.mapred.mode", "nonstrict"); + put("mapred.input.dir.recursive", "true"); + put("hive.metastore.disallow.incompatible.col.type.changes", "false"); + put("hive.in.repl.test", "true"); + }}; + + acidEnableConf.putAll(overrides); + + primary = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf); + replica = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf); + } + + @AfterClass + public static void classLevelTearDown() throws IOException { + primary.close(); + replica.close(); + } + + @Before + public void setup() throws Throwable { + primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); + replicatedDbName = "replicated_" + primaryDbName; + primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')"); + } + + @After + public void tearDown() throws Throwable { + primary.run("drop database if exists " + primaryDbName + " cascade"); + replica.run("drop database if exists " + replicatedDbName + " cascade"); + } + + @Test + public void testHiveStreamingUnpartitionedWithTxnBatchSizeAsOne() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = primary.dump(primaryDbName, null); + replica.loadWithoutExplain(replicatedDbName, bootstrapDump.dumpLocation); + + // Create an ACID table. + String tblName = "alerts"; + primary.run("use " + primaryDbName) + .run("create table " + tblName + "( id int , msg string ) " + + "clustered by (id) into 5 buckets " + + "stored as orc tblproperties(\"transactional\"=\"true\")"); + + // Create delimited record writer whose schema exactly matches table schema + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + + // Create and open streaming connection (default.src table has to exist already) + // By default, txn batch size is 1. + StreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(primaryDbName) + .withTable(tblName) + .withAgentInfo("example-agent-1") + .withRecordWriter(writer) + .withHiveConf(primary.getConf()) + .connect(); + + // Begin a transaction, write records and commit 1st transaction + connection.beginTransaction(); + connection.write("1,val1".getBytes()); + connection.write("2,val2".getBytes()); + connection.commitTransaction(); + + // Replicate the committed data which should be visible. + WarehouseInstance.Tuple incrDump = primary.dump(primaryDbName, bootstrapDump.lastReplicationId); + replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation) + .run("use " + replicatedDbName) + .run("select msg from " + tblName + " order by msg") + .verifyResults((new String[] {"val1", "val2"})); + + // Begin another transaction, write more records and commit 2nd transaction + connection.beginTransaction(); + connection.write("3,val3".getBytes()); + connection.write("4,val4".getBytes()); + + // Replicate events before committing txn. The uncommitted data shouldn't be seen. + incrDump = primary.dump(primaryDbName, bootstrapDump.lastReplicationId); + replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation) + .run("use " + replicatedDbName) + .run("select msg from " + tblName + " order by msg") + .verifyResults((new String[] {"val1", "val2"})); + + connection.commitTransaction(); + + // After commit, the data should be replicated and visible. + incrDump = primary.dump(primaryDbName, incrDump.lastReplicationId); + replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation) + .run("use " + replicatedDbName) + .run("select msg from " + tblName + " order by msg") + .verifyResults((new String[] {"val1", "val2", "val3", "val4"})); + + // Begin another transaction, write more records and abort 3rd transaction + connection.beginTransaction(); + connection.write("5,val5".getBytes()); + connection.write("6,val6".getBytes()); + connection.abortTransaction(); + + // Aborted data shouldn't be visible. + incrDump = primary.dump(primaryDbName, incrDump.lastReplicationId); + replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation) + .run("use " + replicatedDbName) + .run("select msg from " + tblName + " order by msg") + .verifyResults((new String[] {"val1", "val2", "val3", "val4"})); + + // Close the streaming connection + connection.close(); + } + + @Test + public void testHiveStreamingStaticPartitionWithTxnBatchSizeAsOne() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = primary.dump(primaryDbName, null); + replica.loadWithoutExplain(replicatedDbName, bootstrapDump.dumpLocation); + + // Create an ACID table. + String tblName = "alerts"; + primary.run("use " + primaryDbName) + .run("create table " + tblName + "( id int , msg string ) " + + "partitioned by (continent string, country string) " + + "clustered by (id) into 5 buckets " + + "stored as orc tblproperties(\"transactional\"=\"true\")"); + + // Static partition values + ArrayList partitionVals = new ArrayList(2); + partitionVals.add("Asia"); + partitionVals.add("India"); + + // Create delimited record writer whose schema exactly matches table schema + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + + // Create and open streaming connection (default.src table has to exist already) + // By default, txn batch size is 1. + StreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(primaryDbName) + .withTable(tblName) + .withStaticPartitionValues(partitionVals) + .withAgentInfo("example-agent-1") + .withRecordWriter(writer) + .withHiveConf(primary.getConf()) + .connect(); + + // Begin a transaction, write records and commit 1st transaction + connection.beginTransaction(); + connection.write("1,val1".getBytes()); + connection.write("2,val2".getBytes()); + connection.commitTransaction(); + + // Replicate the committed data which should be visible. + WarehouseInstance.Tuple incrDump = primary.dump(primaryDbName, bootstrapDump.lastReplicationId); + replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation) + .run("use " + replicatedDbName) + .run("select msg from " + tblName + " where continent='Asia' and country='India' order by msg") + .verifyResults((new String[] {"val1", "val2"})); + + // Begin another transaction, write more records and commit 2nd transaction + connection.beginTransaction(); + connection.write("3,val3".getBytes()); + connection.write("4,val4".getBytes()); + + // Replicate events before committing txn. The uncommitted data shouldn't be seen. + incrDump = primary.dump(primaryDbName, bootstrapDump.lastReplicationId); + replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation) + .run("use " + replicatedDbName) + .run("select msg from " + tblName + " where continent='Asia' and country='India' order by msg") + .verifyResults((new String[] {"val1", "val2"})); + + connection.commitTransaction(); + + // After commit, the data should be replicated and visible. + incrDump = primary.dump(primaryDbName, incrDump.lastReplicationId); + replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation) + .run("use " + replicatedDbName) + .run("select msg from " + tblName + " where continent='Asia' and country='India' order by msg") + .verifyResults((new String[] {"val1", "val2", "val3", "val4"})); + + // Begin another transaction, write more records and abort 3rd transaction + connection.beginTransaction(); + connection.write("5,val5".getBytes()); + connection.write("6,val6".getBytes()); + connection.abortTransaction(); + + // Aborted data shouldn't be visible. + incrDump = primary.dump(primaryDbName, incrDump.lastReplicationId); + replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation) + .run("use " + replicatedDbName) + .run("select msg from " + tblName + " where continent='Asia' and country='India' order by msg") + .verifyResults((new String[] {"val1", "val2", "val3", "val4"})); + + // Close the streaming connection + connection.close(); + } + + @Test + public void testHiveStreamingDynamicPartitionWithTxnBatchSizeAsOne() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = primary.dump(primaryDbName, null); + replica.loadWithoutExplain(replicatedDbName, bootstrapDump.dumpLocation); + + // Create an ACID table. + String tblName = "alerts"; + primary.run("use " + primaryDbName) + .run("create table " + tblName + "( id int , msg string ) " + + "partitioned by (continent string, country string) " + + "clustered by (id) into 5 buckets " + + "stored as orc tblproperties(\"transactional\"=\"true\")"); + + // Dynamic partitioning + // Create delimited record writer whose schema exactly matches table schema + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + + // Create and open streaming connection (default.src table has to exist already) + // By default, txn batch size is 1. + StreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(primaryDbName) + .withTable(tblName) + .withAgentInfo("example-agent-1") + .withRecordWriter(writer) + .withHiveConf(primary.getConf()) + .connect(); + + // Begin a transaction, write records and commit 1st transaction + connection.beginTransaction(); + + // Dynamic partition mode where last 2 columns are partition values + connection.write("11,val11,Asia,China".getBytes()); + connection.write("12,val12,Asia,India".getBytes()); + connection.commitTransaction(); + + // Replicate the committed data which should be visible. + WarehouseInstance.Tuple incrDump = primary.dump(primaryDbName, bootstrapDump.lastReplicationId); + replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation) + .run("use " + replicatedDbName) + .run("select msg from " + tblName + " where continent='Asia' and country='China' order by msg") + .verifyResults((new String[] {"val11"})) + .run("select msg from " + tblName + " where continent='Asia' and country='India' order by msg") + .verifyResults((new String[] {"val12"})); + + // Begin another transaction, write more records and commit 2nd transaction + connection.beginTransaction(); + connection.write("13,val13,Europe,Germany".getBytes()); + connection.write("14,val14,Asia,India".getBytes()); + + // Replicate events before committing txn. The uncommitted data shouldn't be seen. + incrDump = primary.dump(primaryDbName, bootstrapDump.lastReplicationId); + replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation) + .run("use " + replicatedDbName) + .run("select msg from " + tblName + " where continent='Asia' and country='India' order by msg") + .verifyResults((new String[] {"val12"})); + + connection.commitTransaction(); + + // After committing the txn, the data should be visible. + incrDump = primary.dump(primaryDbName, bootstrapDump.lastReplicationId); + replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation) + .run("use " + replicatedDbName) + .run("select msg from " + tblName + " where continent='Asia' and country='India' order by msg") + .verifyResults((new String[] {"val12", "val14"})) + .run("select msg from " + tblName + " where continent='Europe' and country='Germany' order by msg") + .verifyResults((new String[] {"val13"})); + + // Begin a transaction, write records and abort 3rd transaction + connection.beginTransaction(); + connection.write("15,val15,Asia,China".getBytes()); + connection.write("16,val16,Asia,India".getBytes()); + connection.abortTransaction(); + + // Aborted data should not be visible. + incrDump = primary.dump(primaryDbName, bootstrapDump.lastReplicationId); + replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation) + .run("use " + replicatedDbName) + .run("select msg from " + tblName + " where continent='Asia' and country='India' order by msg") + .verifyResults((new String[] {"val12", "val14"})) + .run("select msg from " + tblName + " where continent='Asia' and country='China' order by msg") + .verifyResults((new String[] {"val11"})); + + // Close the streaming connection + connection.close(); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 24fc0d5437..7ee6322d52 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -266,6 +266,35 @@ public static String baseOrDeltaSubdir(boolean baseDirRequired, long min, long m } } + /** + * Return a base or delta directory path according to the given "options". + */ + public static Path baseOrDeltaSubdirPath(Path directory, AcidOutputFormat.Options options) { + String subdir; + if (options.isWritingBase()) { + subdir = BASE_PREFIX + String.format(DELTA_DIGITS, + options.getMaximumWriteId()); + } else if(options.getStatementId() == -1) { + //when minor compaction runs, we collapse per statement delta files inside a single + //transaction so we no longer need a statementId in the file name + subdir = options.isWritingDeleteDelta() ? + deleteDeltaSubdir(options.getMinimumWriteId(), + options.getMaximumWriteId()) + : deltaSubdir(options.getMinimumWriteId(), + options.getMaximumWriteId()); + } else { + subdir = options.isWritingDeleteDelta() ? + deleteDeltaSubdir(options.getMinimumWriteId(), + options.getMaximumWriteId(), + options.getStatementId()) + : deltaSubdir(options.getMinimumWriteId(), + options.getMaximumWriteId(), + options.getStatementId()); + } + subdir = addVisibilitySuffix(subdir, options.getVisibilityTxnId()); + return new Path(directory, subdir); + } + /** * Create a filename for a bucket file. * @param directory the partition directory @@ -274,32 +303,12 @@ public static String baseOrDeltaSubdir(boolean baseDirRequired, long min, long m */ public static Path createFilename(Path directory, AcidOutputFormat.Options options) { - String subdir; if (options.getOldStyle()) { return new Path(directory, String.format(LEGACY_FILE_BUCKET_DIGITS, options.getBucketId()) + "_0"); - } else if (options.isWritingBase()) { - subdir = BASE_PREFIX + String.format(DELTA_DIGITS, - options.getMaximumWriteId()); - } else if(options.getStatementId() == -1) { - //when minor compaction runs, we collapse per statement delta files inside a single - //transaction so we no longer need a statementId in the file name - subdir = options.isWritingDeleteDelta() ? - deleteDeltaSubdir(options.getMinimumWriteId(), - options.getMaximumWriteId()) - : deltaSubdir(options.getMinimumWriteId(), - options.getMaximumWriteId()); } else { - subdir = options.isWritingDeleteDelta() ? - deleteDeltaSubdir(options.getMinimumWriteId(), - options.getMaximumWriteId(), - options.getStatementId()) - : deltaSubdir(options.getMinimumWriteId(), - options.getMaximumWriteId(), - options.getStatementId()); + return createBucketFile(baseOrDeltaSubdirPath(directory, options), options.getBucketId()); } - subdir = addVisibilitySuffix(subdir, options.getVisibilityTxnId()); - return createBucketFile(new Path(directory, subdir), options.getBucketId()); } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 111cd1dc3f..55ae535df5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2362,7 +2362,8 @@ private boolean areEventsForDmlNeeded(Table tbl, Partition oldPart) { ((null != oldPart) || AcidUtils.isTransactionalTable(tbl)); } - public void listFilesInsideAcidDirectory(Path acidDir, FileSystem srcFs, List newFiles) throws IOException { + public static void listFilesInsideAcidDirectory(Path acidDir, FileSystem srcFs, List newFiles) + throws IOException { // list out all the files/directory in the path FileStatus[] acidFiles; acidFiles = srcFs.listStatus(acidDir); @@ -2370,6 +2371,7 @@ public void listFilesInsideAcidDirectory(Path acidDir, FileSystem srcFs, List partitionSpec "partition " + partitionSpec + " list of files " + newFiles); try { - FileSystem fileSystem = tbl.getDataLocation().getFileSystem(conf); Long txnId = SessionState.get().getTxnMgr().getCurrentTxnId(); - - InsertEventRequestData insertData = new InsertEventRequestData(); - insertData.setReplace(true); - - WriteNotificationLogRequest rqst = new WriteNotificationLogRequest(txnId, writeId, - tbl.getDbName(), tbl.getTableName(), insertData); - addInsertFileInformation(newFiles, fileSystem, insertData); - + List partitionVals = null; if (partitionSpec != null && !partitionSpec.isEmpty()) { + partitionVals = new ArrayList<>(); for (FieldSchema fs : tbl.getPartitionKeys()) { - rqst.addToPartitionVals(partitionSpec.get(fs.getName())); + partitionVals.add(partitionSpec.get(fs.getName())); } } - getSynchronizedMSC().addWriteNotificationLog(rqst); + + addWriteNotificationLog(conf, tbl, partitionVals, txnId, writeId, newFiles); } catch (IOException | TException e) { throw new HiveException(e); } } + public static void addWriteNotificationLog(HiveConf conf, Table tbl, List partitionVals, + Long txnId, Long writeId, List newFiles) + throws IOException, HiveException, TException { + FileSystem fileSystem = tbl.getDataLocation().getFileSystem(conf); + InsertEventRequestData insertData = new InsertEventRequestData(); + insertData.setReplace(true); + + WriteNotificationLogRequest rqst = new WriteNotificationLogRequest(txnId, writeId, + tbl.getDbName(), tbl.getTableName(), insertData); + addInsertFileInformation(newFiles, fileSystem, insertData); + rqst.setPartitionVals(partitionVals); + + get(conf).getSynchronizedMSC().addWriteNotificationLog(rqst); + } + private void fireInsertEvent(Table tbl, Map partitionSpec, boolean replace, List newFiles) throws HiveException { if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) { diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java index 14d34d476e..03c9fe00a3 100644 --- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -494,24 +494,28 @@ protected void checkAutoFlush() throws StreamingIOFailure { return addedPartitions; } - protected RecordUpdater createRecordUpdater(final Path partitionPath, int bucketId, Long minWriteId, - Long maxWriteID) - throws IOException { + protected RecordUpdater createRecordUpdater(List partitionValues, final Path partitionPath, + int bucketId, Long minWriteId, Long maxWriteID) + throws IOException { // Initialize table properties from the table parameters. This is required because the table // may define certain table parameters that may be required while writing. The table parameter // 'transactional_properties' is one such example. Properties tblProperties = new Properties(); tblProperties.putAll(table.getParameters()); - return acidOutputFormat.getRecordUpdater(partitionPath, - new AcidOutputFormat.Options(conf) - .filesystem(fs) - .inspector(outputRowObjectInspector) - .bucket(bucketId) - .tableProperties(tblProperties) - .minimumWriteId(minWriteId) - .maximumWriteId(maxWriteID) - .statementId(statementId) - .finalDestination(partitionPath)); + + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) + .filesystem(fs) + .inspector(outputRowObjectInspector) + .bucket(bucketId) + .tableProperties(tblProperties) + .minimumWriteId(minWriteId) + .maximumWriteId(maxWriteID) + .statementId(statementId) + .finalDestination(partitionPath); + + // Add write directory information in the connection object. + conn.addWriteDirectoryInfo(partitionValues, AcidUtils.baseOrDeltaSubdirPath(partitionPath, options)); + return acidOutputFormat.getRecordUpdater(partitionPath, options); } /** @@ -594,7 +598,8 @@ protected RecordUpdater getRecordUpdater(List partitionValues, int bucke } if (recordUpdater == null) { try { - recordUpdater = createRecordUpdater(destLocation, bucketId, curBatchMinWriteId, curBatchMaxWriteId); + recordUpdater = createRecordUpdater(partitionValues, destLocation, + bucketId, curBatchMinWriteId, curBatchMaxWriteId); } catch (IOException e) { String errMsg = "Failed creating RecordUpdater for " + getWatermark(destLocation.toString()); LOG.error(errMsg, e); diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index fa7e079331..13e6fc6441 100644 --- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -23,6 +23,7 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -146,7 +147,7 @@ public String toString() { private boolean manageTransactions; private int countTransactions = 0; private Set partitions; - private Long tableId; + private Map writePaths; private Runnable onShutdownRunner; private HiveStreamingConnection(Builder builder) throws StreamingException { @@ -161,6 +162,7 @@ private HiveStreamingConnection(Builder builder) throws StreamingException { this.tableObject = builder.tableObject; this.setPartitionedTable(builder.isPartitioned); this.manageTransactions = builder.manageTransactions; + this.writePaths = new HashMap<>(); UserGroupInformation loggedInUser = null; try { @@ -531,7 +533,7 @@ private void beginNextTransaction() throws StreamingException { if (currentTransactionBatch.remainingTransactions() == 0) { LOG.info("Transaction batch {} is done. Rolling over to next transaction batch.", currentTransactionBatch); - currentTransactionBatch.close(); + closeCurrentTransactionBatch(); currentTransactionBatch = createNewTransactionBatch(); LOG.info("Rolled over to new transaction batch {}", currentTransactionBatch); } @@ -568,6 +570,11 @@ private void checkState() throws StreamingException { } } + private void closeCurrentTransactionBatch() throws StreamingException { + currentTransactionBatch.close(); + writePaths.clear(); + } + @Override public void beginTransaction() throws StreamingException { checkClosedState(); @@ -644,7 +651,7 @@ public void close() { isConnectionClosed.set(true); try { if (currentTransactionBatch != null) { - currentTransactionBatch.close(); + closeCurrentTransactionBatch(); } } catch (StreamingException e) { LOG.warn("Unable to close current transaction batch: " + currentTransactionBatch, e); @@ -686,6 +693,75 @@ private static IMetaStoreClient getMetaStoreClient(HiveConf conf, String metasto } } + private class WriteDirInfo { + List partitionVals; + Path writeDir; + + WriteDirInfo(List partitionVals, Path writeDir) { + this.partitionVals = partitionVals; + this.writeDir = writeDir; + } + + List getPartitionVals() { + return this.partitionVals; + } + + Path getWriteDir() { + return this.writeDir; + } + } + + @Override + public void addWriteDirectoryInfo(List partitionValues, Path writeDir) { + String key = (partitionValues == null) ? tableObject.getFullyQualifiedName() + : partitionValues.toString(); + if (!writePaths.containsKey(key)) { + writePaths.put(key, new WriteDirInfo(partitionValues, writeDir)); + } + } + + /** + * Add Write notification events if it is enabled. + * @throws StreamingException File operation errors or HMS errors. + */ + @Override + public void addWriteNotificationEvents() throws StreamingException { + if (!conf.getBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML)) { + LOG.debug("Write notification log is ignored as dml event logging is disabled."); + return; + } + try { + // Traverse the write paths for the current streaming connection and add one write notification + // event per table or partitions. + // For non-partitioned table, there will be only one entry in writePath and corresponding + // partitionVals is null. + Long txnId = getCurrentTxnId(); + Long writeId = getCurrentWriteId(); + for (WriteDirInfo writeInfo : writePaths.values()) { + LOG.debug("TxnId: " + txnId + ", WriteId: " + writeId + + " - Logging write event for the files in path " + writeInfo.getWriteDir()); + + // List the new files added inside the write path (delta directory). + FileSystem fs = tableObject.getDataLocation().getFileSystem(conf); + List newFiles = new ArrayList<>(); + Hive.listFilesInsideAcidDirectory(writeInfo.getWriteDir(), fs, newFiles); + + // If no files are added by this streaming writes, then no need to log write notification event. + if (newFiles.isEmpty()) { + LOG.debug("TxnId: " + txnId + ", WriteId: " + writeId + + " - Skipping empty path " + writeInfo.getWriteDir()); + continue; + } + + // Add write notification events into HMS table. + Hive.addWriteNotificationLog(conf, tableObject, writeInfo.getPartitionVals(), + txnId, writeId, newFiles); + } + } catch (IOException | TException | HiveException e) { + throw new StreamingException("Failed to log write notification events.", e); + } + } + @VisibleForTesting TxnState getCurrentTransactionState() { return currentTransactionBatch.getCurrentTransactionState(); diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java index ba4c6a5aac..eb7c3c68bb 100644 --- a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java @@ -134,4 +134,20 @@ default Path getDeltaFileLocation(List partitionValues, throws StreamingException { throw new UnsupportedOperationException(); } + + /** + * Adds the information of delta directory under which bucket files are created by streaming write. + * Hive replication uses this information to log write events. + * @param partitionValues partition values + * @param writeDir Delta directory under which bucket files are written by streaming + */ + default void addWriteDirectoryInfo(List partitionValues, Path writeDir) { + } + + /** + * Add Write notification events if it is enabled. + * @throws StreamingException File operation errors or HMS errors. + */ + default void addWriteNotificationEvents() throws StreamingException { + } } diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java b/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java index a625759c0b..4bff2c4616 100644 --- a/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java +++ b/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java @@ -285,6 +285,18 @@ private void commitImpl(Set partitions, String key, String value) DataOperationType.INSERT); } } + + // If it is the last transaction in the batch, then close the files and add write events. + // We need to close the writer as file checksum can't be obtained on the opened file. + if (currentTxnIndex + 1 >= txnToWriteIds.size()) { + // TODO: Replication doesn't work if txn batch size > 1 and the last txn is aborted as write events + // are ignored by abort txn event. + recordWriter.close(); + + // Add write notification events if it is enabled. + conn.addWriteNotificationEvents(); + } + transactionLock.lock(); try { if (key != null) {