diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java index 40b2e8e..1119fa2 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java @@ -178,7 +178,7 @@ private void populateRecord(List r, JsonToken token, JsonParser p, Struc if (token != JsonToken.FIELD_NAME) { throw new IOException("Field name expected"); } - String fieldName = p.getText(); + String fieldName = p.getText().toLowerCase(); int fpos = s.getAllStructFieldNames().indexOf(fieldName); if (fpos == -1) { fpos = getPositionFromHiveInternalColumnName(fieldName); diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index 0f9260d..1d8fdff 100644 --- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -899,6 +899,10 @@ private void abort(final boolean abortAllRemaining) throws StreamingException { } private void abortImpl(boolean abortAllRemaining) throws StreamingException { + if (minTxnId == null) { + return; + } + transactionLock.lock(); try { if (abortAllRemaining) { diff --git a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java index 0f9b652..cabb64c 100644 --- a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java @@ -20,11 +20,14 @@ import java.util.Properties; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.JsonSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.io.Text; +import com.google.common.base.Joiner; + /** * Streaming Writer handles utf8 encoded Json (Strict syntax). * Uses org.apache.hadoop.hive.serde2.JsonSerDe to process Json input @@ -64,6 +67,8 @@ public StrictJsonWriter build() { public JsonSerDe createSerde() throws SerializationError { try { Properties tableProps = table.getMetadata(); + tableProps.setProperty(serdeConstants.LIST_COLUMNS, Joiner.on(",").join(inputColumns)); + tableProps.setProperty(serdeConstants.LIST_COLUMN_TYPES, Joiner.on(":").join(inputTypes)); JsonSerDe serde = new JsonSerDe(); SerDeUtils.initializeSerDe(serde, conf, tableProps, null); this.serde = serde; diff --git a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java index 3651fa1..12516f5 100644 --- a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java @@ -18,15 +18,20 @@ package org.apache.hive.streaming; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.RegexSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.io.Text; +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; + /** * Streaming Writer handles text input data with regex. Uses * org.apache.hadoop.hive.serde2.RegexSerDe @@ -75,7 +80,17 @@ public RegexSerDe createSerde() throws SerializationError { try { Properties tableProps = table.getMetadata(); tableProps.setProperty(RegexSerDe.INPUT_REGEX, regex); - tableProps.setProperty(serdeConstants.LIST_COLUMNS, StringUtils.join(inputColumns, ",")); + tableProps.setProperty(serdeConstants.LIST_COLUMNS, Joiner.on(",").join(inputColumns)); + tableProps.setProperty(serdeConstants.LIST_COLUMN_TYPES, Joiner.on(":").join(inputTypes)); + final String columnComments = tableProps.getProperty("columns.comments"); + if (columnComments != null) { + List comments = Lists.newArrayList(Splitter.on('\0').split(columnComments)); + int commentsSize = comments.size(); + for (int i = 0; i < inputColumns.size() - commentsSize; i++) { + comments.add(""); + } + tableProps.setProperty("columns.comments", Joiner.on('\0').join(comments)); + } RegexSerDe serde = new RegexSerDe(); SerDeUtils.initializeSerDe(serde, conf, tableProps, null); this.serde = serde; diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java b/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java index e513915..32a6d06 100644 --- a/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java +++ b/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -579,6 +580,82 @@ public void testWriteBeforeBegin() throws Exception { } @Test + public void testRegexInputStreamDP() throws Exception { + String regex = "([^,]*),(.*),(.*),(.*)"; + StrictRegexWriter writer = StrictRegexWriter.newBuilder() + // if unspecified, default one or [\r\n] will be used for line break + .withRegex(regex) + .build(); + StreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withHiveConf(conf) + .withRecordWriter(writer) + .connect(); + + String rows = "1,foo,Asia,India\r2,bar,Europe,Germany\r3,baz,Asia,China\r4,cat,Australia,"; + ByteArrayInputStream bais = new ByteArrayInputStream(rows.getBytes()); + connection.beginTransaction(); + connection.write(bais); + connection.commitTransaction(); + bais.close(); + connection.close(); + + List rs = queryTable(driver, "select * from " + dbName + "." + tblName + " order by id"); + Assert.assertEquals(4, rs.size()); + Assert.assertEquals("1\tfoo\tAsia\tIndia", rs.get(0)); + Assert.assertEquals("2\tbar\tEurope\tGermany", rs.get(1)); + Assert.assertEquals("3\tbaz\tAsia\tChina", rs.get(2)); + Assert.assertEquals("4\tcat\tAustralia\t__HIVE_DEFAULT_PARTITION__", rs.get(3)); + rs = queryTable(driver, "show partitions " + dbName + "." + tblName); + Assert.assertEquals(4, rs.size()); + Assert.assertTrue(rs.contains("continent=Asia/country=India")); + Assert.assertTrue(rs.contains("continent=Asia/country=China")); + Assert.assertTrue(rs.contains("continent=Europe/country=Germany")); + Assert.assertTrue(rs.contains("continent=Australia/country=__HIVE_DEFAULT_PARTITION__")); + } + + @Test + public void testJsonInputStreamDP() throws Exception { + StrictJsonWriter writer = StrictJsonWriter.newBuilder() + .withLineDelimiterPattern("\\|") + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + // 1st Txn + connection.beginTransaction(); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, connection.getCurrentTransactionState()); + String records = "{\"id\" : 1, \"msg\": \"Hello streaming\", \"continent\": \"Asia\", \"Country\": \"India\"}|" + + "{\"id\" : 2, \"msg\": \"Hello world\", \"continent\": \"Europe\", \"Country\": \"Germany\"}|" + + "{\"id\" : 3, \"msg\": \"Hello world!!\", \"continent\": \"Asia\", \"Country\": \"China\"}|" + + "{\"id\" : 4, \"msg\": \"Hmm..\", \"continent\": \"Australia\", \"Unknown-field\": \"whatever\"}|"; + ByteArrayInputStream bais = new ByteArrayInputStream(records.getBytes()); + connection.write(bais); + connection.commitTransaction(); + bais.close(); + connection.close(); + List rs = queryTable(driver, "select * from " + dbName + "." + tblName + " order by id"); + Assert.assertEquals(4, rs.size()); + Assert.assertEquals("1\tHello streaming\tAsia\tIndia", rs.get(0)); + Assert.assertEquals("2\tHello world\tEurope\tGermany", rs.get(1)); + Assert.assertEquals("3\tHello world!!\tAsia\tChina", rs.get(2)); + Assert.assertEquals("4\tHmm..\tAustralia\t__HIVE_DEFAULT_PARTITION__", rs.get(3)); + rs = queryTable(driver, "show partitions " + dbName + "." + tblName); + Assert.assertEquals(4, rs.size()); + Assert.assertTrue(rs.contains("continent=Asia/country=India")); + Assert.assertTrue(rs.contains("continent=Asia/country=China")); + Assert.assertTrue(rs.contains("continent=Europe/country=Germany")); + Assert.assertTrue(rs.contains("continent=Australia/country=__HIVE_DEFAULT_PARTITION__")); + } + + @Test public void testWriteAfterClose() throws Exception { StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() .withFieldDelimiter(',')