diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java index 643bcc4..39740ac 100644 --- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -20,12 +20,14 @@ import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Scanner; import java.util.Set; import java.util.stream.Collectors; @@ -52,12 +54,14 @@ import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector; import org.apache.hadoop.util.ReflectionUtils; import org.apache.thrift.TException; +import org.datanucleus.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public abstract class AbstractRecordWriter implements RecordWriter { private static final Logger LOG = LoggerFactory.getLogger(AbstractRecordWriter.class.getName()); + private static final String DEFAULT_LINE_DELIMITER_PATTERN = "[\r\n]"; protected HiveConf conf; private StreamingConnection conn; protected Table tbl; @@ -85,6 +89,11 @@ private AcidOutputFormat acidOutputFormat; private Long curBatchMinWriteId; private Long curBatchMaxWriteId; + private final String lineDelimiter; + + public AbstractRecordWriter(final String lineDelimiter) { + this.lineDelimiter = StringUtils.isEmpty(lineDelimiter) ? DEFAULT_LINE_DELIMITER_PATTERN : lineDelimiter; + } @Override public void init(StreamingConnection conn, long minWriteId, long maxWriteId) throws StreamingException { @@ -305,6 +314,15 @@ public void close() throws StreamingIOFailure { } @Override + public void write(final long writeId, final InputStream inputStream) throws StreamingException { + try (Scanner scanner = new Scanner(inputStream).useDelimiter(lineDelimiter)) { + while (scanner.hasNext()) { + write(writeId, scanner.next().getBytes()); + } + } + } + + @Override public void write(final long writeId, final byte[] record) throws StreamingException { try { Object encodedRow = encode(record); diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index 205ed6c..7cfadb9 100644 --- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -19,6 +19,7 @@ package org.apache.hive.streaming; import java.io.IOException; +import java.io.InputStream; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -89,7 +90,6 @@ * StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() * .withFieldDelimiter(',') * .build(); - * * // create and open streaming connection (default.src table has to exist already) * StreamingConnection connection = HiveStreamingConnection.newBuilder() * .withDatabase("default") @@ -98,19 +98,16 @@ * .withRecordWriter(writer) * .withHiveConf(hiveConf) * .connect(); - * * // begin a transaction, write records and commit 1st transaction * connection.beginTransaction(); * connection.write("key1,val1".getBytes()); * connection.write("key2,val2".getBytes()); * connection.commitTransaction(); - * * // begin another transaction, write more records and commit 2nd transaction * connection.beginTransaction(); * connection.write("key3,val3".getBytes()); * connection.write("key4,val4".getBytes()); * connection.commitTransaction(); - * * // close the streaming connection * connection.close(); * } @@ -485,12 +482,6 @@ private void checkState() throws StreamingException { } @Override - public void write(final byte[] record) throws StreamingException { - checkState(); - currentTransactionBatch.write(record); - } - - @Override public void beginTransaction() throws StreamingException { checkClosedState(); beginNextTransaction(); @@ -509,6 +500,21 @@ public void abortTransaction() throws StreamingException { } @Override + public void write(final byte[] record) throws StreamingException { + checkState(); + currentTransactionBatch.write(record); + } + + @Override + public void write(final InputStream inputStream) throws StreamingException { + checkState(); + currentTransactionBatch.write(inputStream); + } + + /** + * Close connection + */ + @Override public void close() { if (isConnectionClosed.get()) { return; @@ -767,6 +773,24 @@ public void write(final byte[] record) throws StreamingException { } } + public void write(final InputStream inputStream) throws StreamingException { + checkIsClosed(); + boolean success = false; + try { + recordWriter.write(getCurrentWriteId(), inputStream); + success = true; + } catch (SerializationError ex) { + //this exception indicates that a {@code record} could not be parsed and the + //caller can decide whether to drop it or send it to dead letter queue. + //rolling back the txn and retrying won'table help since the tuple will be exactly the same + //when it's replayed. + success = true; + throw ex; + } finally { + markDead(success); + } + } + private void checkIsClosed() throws StreamingException { if (isTxnClosed.get()) { throw new StreamingException("Transaction" + toString() + " is closed()"); diff --git a/streaming/src/java/org/apache/hive/streaming/RecordWriter.java b/streaming/src/java/org/apache/hive/streaming/RecordWriter.java index 4d25924..d9c4455 100644 --- a/streaming/src/java/org/apache/hive/streaming/RecordWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/RecordWriter.java @@ -19,6 +19,8 @@ package org.apache.hive.streaming; +import java.io.InputStream; + import java.util.Set; public interface RecordWriter { @@ -34,15 +36,28 @@ void init(StreamingConnection connection, long minWriteId, long maxWriteID) throws StreamingException; /** - * Writes using a hive RecordUpdater + * Writes using a hive RecordUpdater. * - * @param writeId the write ID of the table mapping to Txn in which the write occurs - * @param record the record to be written + * @param writeId - the write ID of the table mapping to Txn in which the write occurs + * @param record - the record to be written + * @throws StreamingException - thrown when write fails */ void write(long writeId, byte[] record) throws StreamingException; /** + * Writes using a hive RecordUpdater. The specified input stream will be automatically closed + * by the API after reading all the records out of it. + * + * @param writeId - the write ID of the table mapping to Txn in which the write occurs + * @param inputStream - the record to be written + * @throws StreamingException - thrown when write fails + */ + void write(long writeId, InputStream inputStream) throws StreamingException; + + /** * Flush records from buffer. Invoked by TransactionBatch.commitTransaction() + * + * @throws StreamingException - thrown when flush fails */ void flush() throws StreamingException; diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java index cd7f3d8..0027dbe 100644 --- a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java @@ -18,6 +18,8 @@ package org.apache.hive.streaming; +import java.io.InputStream; + import org.apache.hadoop.hive.conf.HiveConf; public interface StreamingConnection extends ConnectionInfo, PartitionHandler { @@ -44,6 +46,14 @@ void write(byte[] record) throws StreamingException; /** + * Write record using RecordWriter. + * + * @param inputStream - input stream of records + * @throws StreamingException - if there are errors when writing + */ + void write(InputStream inputStream) throws StreamingException; + + /** * Commit a transaction to make the writes visible for readers. * * @throws StreamingException - if there are errors when committing the open transaction diff --git a/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java index 4a07435..c55b4d2 100644 --- a/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java @@ -19,7 +19,9 @@ package org.apache.hive.streaming; +import java.io.InputStream; import java.util.Properties; +import java.util.Scanner; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.serde.serdeConstants; @@ -45,6 +47,7 @@ private LazySimpleSerDe serde; private StrictDelimitedInputWriter(Builder builder) { + super(builder.lineDelimiter); this.fieldDelimiter = builder.fieldDelimiter; this.collectionDelimiter = builder.collectionDelimiter; this.mapKeyDelimiter = builder.mapKeyDelimiter; @@ -58,6 +61,7 @@ public static Builder newBuilder() { private char fieldDelimiter = (char) LazySerDeParameters.DefaultSeparators[0]; private char collectionDelimiter = (char) LazySerDeParameters.DefaultSeparators[1]; private char mapKeyDelimiter = (char) LazySerDeParameters.DefaultSeparators[2]; + private String lineDelimiter; public Builder withFieldDelimiter(final char fieldDelimiter) { this.fieldDelimiter = fieldDelimiter; @@ -74,6 +78,11 @@ public Builder withMapKeyDelimiter(final char mapKeyDelimiter) { return this; } + public Builder withLineDelimiterPattern(final String lineDelimiter) { + this.lineDelimiter = lineDelimiter; + return this; + } + public StrictDelimitedInputWriter build() { return new StrictDelimitedInputWriter(this); } diff --git a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java index 1600e7c..1d18095 100644 --- a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java @@ -35,13 +35,24 @@ public class StrictJsonWriter extends AbstractRecordWriter { private JsonSerDe serde; + public StrictJsonWriter(final Builder builder) { + super(builder.lineDelimiter); + } + public static Builder newBuilder() { return new Builder(); } public static class Builder { + private String lineDelimiter; + + public Builder withLineDelimiterPattern(final String lineDelimiter) { + this.lineDelimiter = lineDelimiter; + return this; + } + public StrictJsonWriter build() { - return new StrictJsonWriter(); + return new StrictJsonWriter(this); } } diff --git a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java index 563cf66..a864a44 100644 --- a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java @@ -39,6 +39,7 @@ private RegexSerDe serde; private StrictRegexWriter(final Builder builder) { + super(builder.lineDelimiter); this.regex = builder.regex; } @@ -48,12 +49,18 @@ public static Builder newBuilder() { public static class Builder { private String regex; + private String lineDelimiter; public Builder withRegex(final String regex) { this.regex = regex; return this; } + public Builder withLineDelimiterPattern(final String lineDelimiter) { + this.lineDelimiter = lineDelimiter; + return this; + } + public StrictRegexWriter build() { return new StrictRegexWriter(this); } diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java index a6fdd66..042fdbe 100644 --- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java +++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java @@ -20,11 +20,13 @@ import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.BUCKET_COUNT; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileFilter; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.io.PrintStream; import java.net.URI; import java.net.URISyntaxException; @@ -468,6 +470,60 @@ public void testAllTypesDelimitedWriter() throws Exception { } @Test + public void testAllTypesDelimitedWriterInputStream() throws Exception { + queryTable(driver, "drop table if exists default.alltypes"); + queryTable(driver, + "create table if not exists default.alltypes ( bo boolean, ti tinyint, si smallint, i int, bi bigint, " + + "f float, d double, de decimal(10,3), ts timestamp, da date, s string, c char(5), vc varchar(5), " + + "m map, l array, st struct ) " + + "stored as orc TBLPROPERTIES('transactional'='true')"); + StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter('|') + .withCollectionDelimiter(',') + .withMapKeyDelimiter(':') + .withLineDelimiterPattern("\n") + .build(); + StreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase("default") + .withTable("alltypes") + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withTransactionBatchSize(2) + .withRecordWriter(wr) + .withHiveConf(conf) + .connect(); + + String row1 = "true|10|100|1000|10000|4.0|20.0|4.2222|1969-12-31 " + + "15:59:58.174|1970-01-01|string|hello|hello|k1:v1|100,200|10,foo"; + String row2 = "false|20|200|2000|20000|8.0|40.0|2.2222|1970-12-31 15:59:58.174|1971-01-01|abcd|world|world|" + + "k4:v4|200,300|20,bar"; + String allRows = row1 + "\n" + row2 + "\n"; + ByteArrayInputStream bais = new ByteArrayInputStream(allRows.getBytes()); + connection.beginTransaction(); + connection.write(bais); + connection.commitTransaction(); + connection.close(); + bais.close(); + + List rs = queryTable(driver, "select ROW__ID, bo, ti, si, i, bi, f, d, de, ts, da, s, c, vc, m, l, st," + + " INPUT__FILE__NAME from default.alltypes order by ROW__ID"); + Assert.assertEquals(2, rs.size()); + String gotRow1 = rs.get(0); + String expectedPrefixRow1 = "{\"writeid\":1,\"bucketid\":536870912," + + "\"rowid\":0}\ttrue\t10\t100\t1000\t10000\t4.0\t20.0\t4.222\t1969-12-31 15:59:58.174\t1970-01-01\tstring" + + "\thello\thello\t{\"k1\":\"v1\"}\t[100,200]\t{\"c1\":10,\"c2\":\"foo\"}"; + String expectedSuffixRow1 = "alltypes/delta_0000001_0000002/bucket_00000"; + String gotRow2 = rs.get(1); + String expectedPrefixRow2 = "{\"writeid\":1,\"bucketid\":536870912," + + "\"rowid\":1}\tfalse\t20\t200\t2000\t20000\t8.0\t40.0\t2.222\t1970-12-31 15:59:58.174\t1971-01-01\tabcd" + + "\tworld\tworld\t{\"k4\":\"v4\"}\t[200,300]\t{\"c1\":20,\"c2\":\"bar\"}"; + String expectedSuffixRow2 = "alltypes/delta_0000001_0000002/bucket_00000"; + Assert.assertTrue(gotRow1, gotRow1.startsWith(expectedPrefixRow1)); + Assert.assertTrue(gotRow1, gotRow1.endsWith(expectedSuffixRow1)); + Assert.assertTrue(gotRow2, gotRow2.startsWith(expectedPrefixRow2)); + Assert.assertTrue(gotRow2, gotRow2.endsWith(expectedSuffixRow2)); + } + + @Test public void testAutoRollTransactionBatch() throws Exception { queryTable(driver, "drop table if exists default.streamingnobuckets"); queryTable(driver, "create table default.streamingnobuckets (a string, b string) stored as orc " + @@ -1226,6 +1282,37 @@ public void testTransactionBatchCommitRegex() throws Exception { } @Test + public void testRegexInputStream() throws Exception { + String regex = "([^,]*),(.*)"; + StrictRegexWriter writer = StrictRegexWriter.newBuilder() + // if unspecified, default one or [\r\n] will be used for line break + .withRegex(regex) + .build(); + StreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(partitionVals) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withHiveConf(conf) + .withRecordWriter(writer) + .connect(); + + String rows = "1,foo\r2,bar\r3,baz"; + ByteArrayInputStream bais = new ByteArrayInputStream(rows.getBytes()); + connection.beginTransaction(); + connection.write(bais); + connection.commitTransaction(); + bais.close(); + connection.close(); + + List rs = queryTable(driver, "select * from " + dbName + "." + tblName); + Assert.assertEquals(3, rs.size()); + Assert.assertEquals("1\tfoo\tAsia\tIndia", rs.get(0)); + Assert.assertEquals("2\tbar\tAsia\tIndia", rs.get(1)); + Assert.assertEquals("3\tbaz\tAsia\tIndia", rs.get(2)); + } + + @Test public void testTransactionBatchCommitJson() throws Exception { StrictJsonWriter writer = StrictJsonWriter.newBuilder() .build(); @@ -1261,6 +1348,37 @@ public void testTransactionBatchCommitJson() throws Exception { } @Test + public void testJsonInputStream() throws Exception { + StrictJsonWriter writer = StrictJsonWriter.newBuilder() + .withLineDelimiterPattern("\\|") + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(partitionVals) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + // 1st Txn + connection.beginTransaction(); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, connection.getCurrentTransactionState()); + String records = "{\"id\" : 1, \"msg\": \"Hello streaming\"}|{\"id\" : 2, \"msg\": \"Hello world\"}|{\"id\" : 3, " + + "\"msg\": \"Hello world!!\"}"; + ByteArrayInputStream bais = new ByteArrayInputStream(records.getBytes()); + connection.write(bais); + connection.commitTransaction(); + bais.close(); + connection.close(); + List rs = queryTable(driver, "select * from " + dbName + "." + tblName); + Assert.assertEquals(3, rs.size()); + Assert.assertEquals("1\tHello streaming\tAsia\tIndia", rs.get(0)); + Assert.assertEquals("2\tHello world\tAsia\tIndia", rs.get(1)); + Assert.assertEquals("3\tHello world!!\tAsia\tIndia", rs.get(2)); + } + + @Test public void testRemainingTransactions() throws Exception { StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() .withFieldDelimiter(',') @@ -2797,6 +2915,12 @@ public void write(long writeId, byte[] record) throws StreamingException { } @Override + public void write(final long writeId, final InputStream inputStream) throws StreamingException { + delegate.write(writeId, inputStream); + produceFault(); + } + + @Override public void flush() throws StreamingException { delegate.flush(); produceFault();