diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java index 45bdb24..6de91f3 100644 --- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -20,12 +20,14 @@ import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Scanner; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; @@ -47,6 +49,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector; import org.apache.hadoop.util.ReflectionUtils; import org.apache.thrift.TException; +import org.datanucleus.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +57,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { private static final Logger LOG = LoggerFactory.getLogger(AbstractRecordWriter.class.getName()); + private static final String DEFAULT_LINE_DELIMITER_PATTERN = "[\r\n]"; protected HiveConf conf; private StreamingConnection conn; Table tbl; @@ -84,6 +88,11 @@ private AcidOutputFormat outf; private Long curBatchMinWriteId; private Long curBatchMaxWriteId; + private final String lineDelimiter; + + public AbstractRecordWriter(final String lineDelimiter) { + this.lineDelimiter = StringUtils.isEmpty(lineDelimiter) ? DEFAULT_LINE_DELIMITER_PATTERN : lineDelimiter; + } @Override public void init(StreamingConnection conn) throws StreamingException { @@ -341,6 +350,15 @@ public void closeBatch() throws StreamingIOFailure { } @Override + public void write(final long writeId, final InputStream inputStream) throws StreamingException { + try (Scanner scanner = new Scanner(inputStream).useDelimiter(lineDelimiter)) { + while (scanner.hasNext()) { + write(writeId, scanner.next().getBytes()); + } + } + } + + @Override public void write(final long writeId, final byte[] record) throws StreamingException { try { Object encodedRow = encode(record); diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index 3f173f9..6760bfb 100644 --- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -19,6 +19,7 @@ package org.apache.hive.streaming; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -612,6 +613,12 @@ public void write(final Collection records) throws StreamingException { currentTransactionBatch.write(records); } + @Override + public void write(final InputStream inputStream) throws StreamingException { + Preconditions.checkNotNull(currentTransactionBatch, "Transaction cannot be null. missing beginTransaction()?"); + currentTransactionBatch.write(inputStream); + } + /** * Close connection */ @@ -885,13 +892,34 @@ public void write(final Collection records) throws StreamingException { } } - private void writeImpl(Collection records) - throws StreamingException { + @Override + public void write(final InputStream inputStream) throws StreamingException { + checkIsClosed(); + boolean success = false; + try { + writeImpl(inputStream); + success = true; + } catch (SerializationError ex) { + //this exception indicates that a {@code record} could not be parsed and the + //caller can decide whether to drop it or send it to dead letter queue. + //rolling back the txn and retrying won'table help since the tuple will be exactly the same + //when it's replayed. + success = true; + throw ex; + } finally { + markDead(success); + } + } + + private void writeImpl(final Collection records) throws StreamingException { for (byte[] record : records) { recordWriter.write(getCurrentWriteId(), record); } } + private void writeImpl(final InputStream inputStream) throws StreamingException { + recordWriter.write(getCurrentWriteId(), inputStream); + } @Override public void commit() throws StreamingException { diff --git a/streaming/src/java/org/apache/hive/streaming/RecordWriter.java b/streaming/src/java/org/apache/hive/streaming/RecordWriter.java index ca226d7..4b96284 100644 --- a/streaming/src/java/org/apache/hive/streaming/RecordWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/RecordWriter.java @@ -19,6 +19,8 @@ package org.apache.hive.streaming; +import java.io.InputStream; + public interface RecordWriter { /** @@ -29,15 +31,27 @@ void init(StreamingConnection connection) throws StreamingException; /** - * Writes using a hive RecordUpdater + * Writes using a hive RecordUpdater. * - * @param writeId the write ID of the table mapping to Txn in which the write occurs - * @param record the record to be written + * @param writeId - the write ID of the table mapping to Txn in which the write occurs + * @param record - the record to be written + * @throws StreamingException - thrown when write fails */ void write(long writeId, byte[] record) throws StreamingException; /** + * Writes using a hive RecordUpdater + * + * @param writeId - the write ID of the table mapping to Txn in which the write occurs + * @param inputStream - the record to be written + * @throws StreamingException - thrown when write fails + */ + void write(long writeId, InputStream inputStream) throws StreamingException; + + /** * Flush records from buffer. Invoked by TransactionBatch.commit() + * + * @throws StreamingException - thrown when flush fails */ void flush() throws StreamingException; diff --git a/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java index 4a07435..c55b4d2 100644 --- a/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java @@ -19,7 +19,9 @@ package org.apache.hive.streaming; +import java.io.InputStream; import java.util.Properties; +import java.util.Scanner; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.serde.serdeConstants; @@ -45,6 +47,7 @@ private LazySimpleSerDe serde; private StrictDelimitedInputWriter(Builder builder) { + super(builder.lineDelimiter); this.fieldDelimiter = builder.fieldDelimiter; this.collectionDelimiter = builder.collectionDelimiter; this.mapKeyDelimiter = builder.mapKeyDelimiter; @@ -58,6 +61,7 @@ public static Builder newBuilder() { private char fieldDelimiter = (char) LazySerDeParameters.DefaultSeparators[0]; private char collectionDelimiter = (char) LazySerDeParameters.DefaultSeparators[1]; private char mapKeyDelimiter = (char) LazySerDeParameters.DefaultSeparators[2]; + private String lineDelimiter; public Builder withFieldDelimiter(final char fieldDelimiter) { this.fieldDelimiter = fieldDelimiter; @@ -74,6 +78,11 @@ public Builder withMapKeyDelimiter(final char mapKeyDelimiter) { return this; } + public Builder withLineDelimiterPattern(final String lineDelimiter) { + this.lineDelimiter = lineDelimiter; + return this; + } + public StrictDelimitedInputWriter build() { return new StrictDelimitedInputWriter(this); } diff --git a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java index 1600e7c..1d18095 100644 --- a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java @@ -35,13 +35,24 @@ public class StrictJsonWriter extends AbstractRecordWriter { private JsonSerDe serde; + public StrictJsonWriter(final Builder builder) { + super(builder.lineDelimiter); + } + public static Builder newBuilder() { return new Builder(); } public static class Builder { + private String lineDelimiter; + + public Builder withLineDelimiterPattern(final String lineDelimiter) { + this.lineDelimiter = lineDelimiter; + return this; + } + public StrictJsonWriter build() { - return new StrictJsonWriter(); + return new StrictJsonWriter(this); } } diff --git a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java index 563cf66..a864a44 100644 --- a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java @@ -39,6 +39,7 @@ private RegexSerDe serde; private StrictRegexWriter(final Builder builder) { + super(builder.lineDelimiter); this.regex = builder.regex; } @@ -48,12 +49,18 @@ public static Builder newBuilder() { public static class Builder { private String regex; + private String lineDelimiter; public Builder withRegex(final String regex) { this.regex = regex; return this; } + public Builder withLineDelimiterPattern(final String lineDelimiter) { + this.lineDelimiter = lineDelimiter; + return this; + } + public StrictRegexWriter build() { return new StrictRegexWriter(this); } diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java b/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java index eb85c0e..868212c 100644 --- a/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java +++ b/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java @@ -19,6 +19,7 @@ package org.apache.hive.streaming; +import java.io.InputStream; import java.util.Collection; /** @@ -103,11 +104,20 @@ public String toString() { /** * Write records using RecordWriter. * + * @param records - collection of records. * @throws StreamingException - if there are errors when writing */ void write(Collection records) throws StreamingException; /** + * Write records using RecordWriter. + * + * @param inputStream - input stream of records + * @throws StreamingException - if there are errors when writing + */ + void write(InputStream inputStream) throws StreamingException; + + /** * Close the TransactionBatch. * * @throws StreamingException - if there are errors closing batch diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java index eb4e27f..a74d639 100644 --- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java +++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java @@ -20,11 +20,13 @@ import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.BUCKET_COUNT; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileFilter; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.io.PrintStream; import java.net.URI; import java.net.URISyntaxException; @@ -460,6 +462,60 @@ public void testAllTypesDelimitedWriter() throws Exception { } @Test + public void testAllTypesDelimitedWriterInputStream() throws Exception { + queryTable(driver, "drop table if exists default.alltypes"); + queryTable(driver, + "create table if not exists default.alltypes ( bo boolean, ti tinyint, si smallint, i int, bi bigint, " + + "f float, d double, de decimal(10,3), ts timestamp, da date, s string, c char(5), vc varchar(5), " + + "m map, l array, st struct ) " + + "stored as orc TBLPROPERTIES('transactional'='true')"); + StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter('|') + .withCollectionDelimiter(',') + .withMapKeyDelimiter(':') + .withLineDelimiterPattern("\n") + .build(); + StreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase("default") + .withTable("alltypes") + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withTransactionBatchSize(2) + .withRecordWriter(wr) + .withHiveConf(conf) + .connect(); + + String row1 = "true|10|100|1000|10000|4.0|20.0|4.2222|1969-12-31 " + + "15:59:58.174|1970-01-01|string|hello|hello|k1:v1|100,200|10,foo"; + String row2 = "false|20|200|2000|20000|8.0|40.0|2.2222|1970-12-31 15:59:58.174|1971-01-01|abcd|world|world|" + + "k4:v4|200,300|20,bar"; + String allRows = row1 + "\n" + row2 + "\n"; + ByteArrayInputStream bais = new ByteArrayInputStream(allRows.getBytes()); + connection.beginNextTransaction(); + connection.write(bais); + connection.commit(); + connection.close(); + bais.close(); + + List rs = queryTable(driver, "select ROW__ID, bo, ti, si, i, bi, f, d, de, ts, da, s, c, vc, m, l, st," + + " INPUT__FILE__NAME from default.alltypes order by ROW__ID"); + Assert.assertEquals(2, rs.size()); + String gotRow1 = rs.get(0); + String expectedPrefixRow1 = "{\"writeid\":1,\"bucketid\":536870912," + + "\"rowid\":0}\ttrue\t10\t100\t1000\t10000\t4.0\t20.0\t4.222\t1969-12-31 15:59:58.174\t1970-01-01\tstring" + + "\thello\thello\t{\"k1\":\"v1\"}\t[100,200]\t{\"c1\":10,\"c2\":\"foo\"}"; + String expectedSuffixRow1 = "alltypes/delta_0000001_0000002/bucket_00000"; + String gotRow2 = rs.get(1); + String expectedPrefixRow2 = "{\"writeid\":1,\"bucketid\":536870912," + + "\"rowid\":1}\tfalse\t20\t200\t2000\t20000\t8.0\t40.0\t2.222\t1970-12-31 15:59:58.174\t1971-01-01\tabcd" + + "\tworld\tworld\t{\"k4\":\"v4\"}\t[200,300]\t{\"c1\":20,\"c2\":\"bar\"}"; + String expectedSuffixRow2 = "alltypes/delta_0000001_0000002/bucket_00000"; + Assert.assertTrue(gotRow1, gotRow1.startsWith(expectedPrefixRow1)); + Assert.assertTrue(gotRow1, gotRow1.endsWith(expectedSuffixRow1)); + Assert.assertTrue(gotRow2, gotRow2.startsWith(expectedPrefixRow2)); + Assert.assertTrue(gotRow2, gotRow2.endsWith(expectedSuffixRow2)); + } + + @Test public void testAutoRollTransactionBatch() throws Exception { queryTable(driver, "drop table if exists default.streamingnobuckets"); queryTable(driver, "create table default.streamingnobuckets (a string, b string) stored as orc " + @@ -1202,6 +1258,37 @@ public void testTransactionBatchCommit_Regex() throws Exception { } @Test + public void testRegexInputStream() throws Exception { + String regex = "([^,]*),(.*)"; + StrictRegexWriter writer = StrictRegexWriter.newBuilder() + // if unspecified, default one or [\r\n] will be used for line break + .withRegex(regex) + .build(); + StreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(partitionVals) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withHiveConf(conf) + .withRecordWriter(writer) + .connect(); + + String rows = "1,foo\r2,bar\r3,baz"; + ByteArrayInputStream bais = new ByteArrayInputStream(rows.getBytes()); + connection.beginNextTransaction(); + connection.write(bais); + connection.commit(); + bais.close(); + connection.close(); + + List rs = queryTable(driver, "select * from " + dbName + "." + tblName); + Assert.assertEquals(3, rs.size()); + Assert.assertEquals("1\tfoo\tAsia\tIndia", rs.get(0)); + Assert.assertEquals("2\tbar\tAsia\tIndia", rs.get(1)); + Assert.assertEquals("3\tbaz\tAsia\tIndia", rs.get(2)); + } + + @Test public void testTransactionBatchCommit_Json() throws Exception { StrictJsonWriter writer = StrictJsonWriter.newBuilder() .build(); @@ -1236,6 +1323,38 @@ public void testTransactionBatchCommit_Json() throws Exception { } @Test + public void testJsonInputStream() throws Exception { + StrictJsonWriter writer = StrictJsonWriter.newBuilder() + .withLineDelimiterPattern("\\|") + .build(); + StreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(partitionVals) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + // 1st Txn + connection.beginNextTransaction(); + Assert.assertEquals(TransactionBatch.TxnState.OPEN + , connection.getCurrentTransactionState()); + String records = "{\"id\" : 1, \"msg\": \"Hello streaming\"}|{\"id\" : 2, \"msg\": \"Hello world\"}|{\"id\" : 3, " + + "\"msg\": \"Hello world!!\"}"; + ByteArrayInputStream bais = new ByteArrayInputStream(records.getBytes()); + connection.write(bais); + connection.commit(); + bais.close(); + connection.close(); + List rs = queryTable(driver, "select * from " + dbName + "." + tblName); + Assert.assertEquals(3, rs.size()); + Assert.assertEquals("1\tHello streaming\tAsia\tIndia", rs.get(0)); + Assert.assertEquals("2\tHello world\tAsia\tIndia", rs.get(1)); + Assert.assertEquals("3\tHello world!!\tAsia\tIndia", rs.get(2)); + } + + @Test public void testRemainingTransactions() throws Exception { StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() .withFieldDelimiter(',') @@ -2630,6 +2749,12 @@ public void write(long writeId, byte[] record) throws StreamingException { } @Override + public void write(final long writeId, final InputStream inputStream) throws StreamingException { + delegate.write(writeId, inputStream); + produceFault(); + } + + @Override public void flush() throws StreamingException { delegate.flush(); produceFault();