diff --git kafka-handler/README.md kafka-handler/README.md index 706c77ae25..11b893c5d2 100644 --- kafka-handler/README.md +++ kafka-handler/README.md @@ -126,8 +126,8 @@ left join wiki_kafka_hive as future_activity on | hive.kafka.poll.timeout.ms | Parameter indicating Kafka Consumer poll timeout period in millis. FYI this is independent from internal Kafka consumer timeouts. | No | 5000 (5 Seconds) | | hive.kafka.max.retries | Number of retries for Kafka metadata fetch operations. | No | 6 | | hive.kafka.metadata.poll.timeout.ms | Number of milliseconds before consumer timeout on fetching Kafka metadata. | No | 30000 (30 Seconds) | -| kafka.write.semantic | Writer semantic, allowed values (BEST_EFFORT, AT_LEAST_ONCE, EXACTLY_ONCE) | No | AT_LEAST_ONCE | -| hive.kafka.optimistic.commit | Boolean value indicate the if the producer should commit during task or delegate the commit to HS2. | No | true | +| kafka.write.semantic | Writer semantics, allowed values (AT_LEAST_ONCE, EXACTLY_ONCE) | No | AT_LEAST_ONCE | + ### Setting Extra Consumer/Producer properties. User can inject custom Kafka consumer/producer properties via the Table properties. @@ -213,5 +213,5 @@ Then insert data into the table. Keep in mind that Kafka is an append only, thus ```sql insert into table moving_avg_wiki_kafka_hive select `channel`, `namespace`, `page`, `timestamp`, avg(delta) over (order by `timestamp` asc rows between 60 preceding and current row) as avg_delta, -null as `__key`, null as `__partition`, -1, -1,-1, -1 from l15min_wiki; +null as `__key`, null as `__partition`, -1, -1 from l15min_wiki; ``` diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaOutputFormat.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaOutputFormat.java index 950f7315c2..1ddda8e699 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaOutputFormat.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaOutputFormat.java @@ -49,18 +49,15 @@ Properties tableProperties, Progressable progress) { final String topic = jc.get(KafkaTableProperties.HIVE_KAFKA_TOPIC.getName()); - final Boolean optimisticCommit = jc.getBoolean(KafkaTableProperties.HIVE_KAFKA_OPTIMISTIC_COMMIT.getName(), true); + final Boolean optimisticCommit = jc.getBoolean(KafkaTableProperties.HIVE_KAFKA_OPTIMISTIC_COMMIT.getName(), false); final WriteSemantic writeSemantic = WriteSemantic.valueOf(jc.get(KafkaTableProperties.WRITE_SEMANTIC_PROPERTY.getName())); final Properties producerProperties = KafkaUtils.producerProperties(jc); final FileSinkOperator.RecordWriter recordWriter; switch (writeSemantic) { - case BEST_EFFORT: - recordWriter = new SimpleKafkaWriter(topic, Utilities.getTaskId(jc), false, producerProperties); - break; case AT_LEAST_ONCE: - recordWriter = new SimpleKafkaWriter(topic, Utilities.getTaskId(jc), true, producerProperties); + recordWriter = new SimpleKafkaWriter(topic, Utilities.getTaskId(jc), producerProperties); break; case EXACTLY_ONCE: FileSystem fs; @@ -98,11 +95,6 @@ * Possible write semantic supported by the Record Writer. */ enum WriteSemantic { - /** - * Best effort delivery with no guarantees at all, user can set Producer properties as they wish, - * will carry on when possible unless it is a fatal exception. - */ - BEST_EFFORT, /** * Deliver all the record at least once unless the job fails. * Therefore duplicates can be introduced due to lost ACKs or Tasks retries. diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java index 746de61273..7f8353c9f0 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java @@ -50,8 +50,6 @@ private long consumedRecords = 0L; private long readBytes = 0L; private volatile boolean started = false; - private long startOffset = -1L; - private long endOffset = Long.MAX_VALUE; @SuppressWarnings("WeakerAccess") public KafkaRecordReader() { } @@ -75,13 +73,11 @@ private void initConsumer() { private synchronized void initialize(KafkaInputSplit inputSplit, Configuration jobConf) { if (!started) { this.config = jobConf; - startOffset = inputSplit.getStartOffset(); - endOffset = inputSplit.getEndOffset(); + long startOffset = inputSplit.getStartOffset(); + long endOffset = inputSplit.getEndOffset(); TopicPartition topicPartition = new TopicPartition(inputSplit.getTopic(), inputSplit.getPartition()); Preconditions.checkState(startOffset >= 0 && startOffset <= endOffset, - "Start [%s] has to be positive and less or equal than End [%s]", - startOffset, - endOffset); + "Start [%s] has to be positive and Less than or equal to End [%s]", startOffset, endOffset); totalNumberRecords += endOffset - startOffset; initConsumer(); long @@ -103,7 +99,7 @@ private synchronized void initialize(KafkaInputSplit inputSplit, Configuration j @Override public boolean next(NullWritable nullWritable, KafkaWritable bytesWritable) { if (started && recordsCursor.hasNext()) { ConsumerRecord record = recordsCursor.next(); - bytesWritable.set(record, startOffset, endOffset); + bytesWritable.set(record); consumedRecords += 1; readBytes += record.serializedValueSize(); return true; diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java index 51cfa24929..6b2ca1056e 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java @@ -360,12 +360,12 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) { } private static class TextBytesConverter implements BytesConverter { - Text text = new Text(); + final private Text text = new Text(); @Override public byte[] getBytes(Text writable) { //@TODO There is no reason to decode then encode the string to bytes really //@FIXME this issue with CTRL-CHAR ^0 added by Text at the end of string and Json serd does not like that. try { - return writable.decode(writable.getBytes(), 0, writable.getLength()).getBytes(Charset.forName("UTF-8")); + return Text.decode(writable.getBytes(), 0, writable.getLength()).getBytes(Charset.forName("UTF-8")); } catch (CharacterCodingException e) { throw new RuntimeException(e); } diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java index 0d64cd9c9c..d87f245776 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java @@ -42,6 +42,7 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; @@ -172,6 +173,9 @@ private void configureCommonProperties(TableDesc tableDesc, Map properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokers); properties.setProperty(CommonClientConfigs.CLIENT_ID_CONFIG, Utilities.getTaskId(getConf())); + if (UserGroupInformation.isSecurityEnabled()) { + KafkaUtils.addKerberosJaasConf(getConf(), properties); + } table.getParameters() .entrySet() .stream() @@ -197,6 +201,9 @@ private Properties buildProducerProperties(Table table) { properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokers); + if (UserGroupInformation.isSecurityEnabled()) { + KafkaUtils.addKerberosJaasConf(getConf(), properties); + } table.getParameters() .entrySet() .stream() diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java index 2e1f6faf1f..a4ad01a008 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java @@ -49,6 +49,7 @@ * {@link KafkaOutputFormat.WriteSemantic}. */ WRITE_SEMANTIC_PROPERTY("kafka.write.semantic", KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE.name()), + /** * Table property that indicates if we should commit within the task or delay it to the Metadata Hook Commit call. */ diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java index 6ae9c8d276..81252c5936 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java @@ -24,8 +24,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.ReflectionUtil; import org.apache.kafka.clients.CommonClientConfigs; @@ -40,6 +43,8 @@ import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Arrays; @@ -55,6 +60,9 @@ * Utils class for Kafka Storage handler plus some Constants. */ final class KafkaUtils { + private final static Logger log = LoggerFactory.getLogger(KafkaUtils.class); + private static final String JAAS_TEMPLATE = "com.sun.security.auth.module.Krb5LoginModule required " + + "useKeyTab=true storeKey=true keyTab=\"%s\" principal=\"%s\";"; private KafkaUtils() { } @@ -103,6 +111,10 @@ static Properties consumerProperties(Configuration configuration) { props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerEndPoint); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + //case Kerberos is On + if (UserGroupInformation.isSecurityEnabled()) { + addKerberosJaasConf(configuration, props); + } // user can always override stuff props.putAll(extractExtraProperties(configuration, CONSUMER_CONFIGURATION_PREFIX)); return props; @@ -131,18 +143,21 @@ static Properties producerProperties(Configuration configuration) { + KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName()); } properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerEndPoint); + //case Kerberos is On + if (UserGroupInformation.isSecurityEnabled()) { + addKerberosJaasConf(configuration, properties); + } + // user can always override stuff properties.putAll(extractExtraProperties(configuration, PRODUCER_CONFIGURATION_PREFIX)); String taskId = configuration.get("mapred.task.id", null); properties.setProperty(CommonClientConfigs.CLIENT_ID_CONFIG, taskId == null ? "random_" + UUID.randomUUID().toString() : taskId); switch (writeSemantic) { - case BEST_EFFORT: - break; case AT_LEAST_ONCE: properties.setProperty(ProducerConfig.RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE)); //The number of acknowledgments the producer requires the leader to have received before considering a request as - // complete, all means from all replicas. + //complete. Here all means from all replicas. properties.setProperty(ProducerConfig.ACKS_CONFIG, "all"); break; case EXACTLY_ONCE: @@ -252,4 +267,37 @@ static String getTaskId(Configuration hiveConf) { return id; } + /** + * Helper method that add Kerberos Jaas configs to the properties. + * @param configuration Hive config containing kerberos key and principal + * @param props properties to be populated + */ + static void addKerberosJaasConf(Configuration configuration, Properties props) { + //based on this https://kafka.apache.org/documentation/#security_jaas_client + props.setProperty("security.protocol", "SASL_PLAINTEXT"); + props.setProperty("sasl.mechanism", "GSSAPI"); + props.setProperty("sasl.kerberos.service.name", "kafka"); + + //Construct the principal/keytab + String principalHost = HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL); + String keyTab = HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB); + // back to use LLAP keys if HS2 conf are not set or visible for the Task. + if (principalHost == null || principalHost.isEmpty() || keyTab == null || keyTab.isEmpty()) { + keyTab = HiveConf.getVar(configuration, HiveConf.ConfVars.LLAP_FS_KERBEROS_KEYTAB_FILE); + principalHost = HiveConf.getVar(configuration, HiveConf.ConfVars.LLAP_FS_KERBEROS_PRINCIPAL); + } + + String principal; + try { + principal = SecurityUtil.getServerPrincipal(principalHost, "0.0.0.0"); + } catch (IOException e) { + log.error("Can not construct kerberos principal", e); + throw new RuntimeException(e); + } + final String jaasConf = String.format(JAAS_TEMPLATE, keyTab, principal); + props.setProperty("sasl.jaas.config", jaasConf); + log.info("Kafka client running with following JAAS = [{}]", jaasConf); + } + + } diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaWritable.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaWritable.java index 681b666fdf..ccf413b53a 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaWritable.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaWritable.java @@ -34,8 +34,7 @@ /** * Writable implementation of Kafka ConsumerRecord. * Serialized in the form: - * {@code timestamp} long| {@code partition} (int) | {@code offset} (long) | - * {@code startOffset} (long) | {@code endOffset} (long) | {@code value.size()} (int) | + * {@code timestamp} long| {@code partition} (int) | {@code offset} (long) | {@code value.size()} (int) | * {@code value} (byte []) | {@code recordKey.size()}| {@code recordKey (byte [])} */ public class KafkaWritable implements Writable { @@ -46,43 +45,24 @@ private byte[] value; private byte[] recordKey; - /** - * Fist offset given by the input split used to pull the event {@link KafkaInputSplit#getStartOffset()}. - */ - private long startOffset; - /** - * Last Offset given by the input split used to pull the event {@link KafkaInputSplit#getEndOffset()}. - */ - private long endOffset; - - void set(ConsumerRecord consumerRecord, long startOffset, long endOffset) { + void set(ConsumerRecord consumerRecord) { this.partition = consumerRecord.partition(); this.timestamp = consumerRecord.timestamp(); this.offset = consumerRecord.offset(); this.value = consumerRecord.value(); this.recordKey = consumerRecord.key(); - this.startOffset = startOffset; - this.endOffset = endOffset; } - KafkaWritable(int partition, - long offset, - long timestamp, - byte[] value, - long startOffset, - long endOffset, - @Nullable byte[] recordKey) { + KafkaWritable(int partition, long offset, long timestamp, byte[] value, @Nullable byte[] recordKey) { this.partition = partition; this.offset = offset; this.timestamp = timestamp; this.value = value; this.recordKey = recordKey; - this.startOffset = startOffset; - this.endOffset = endOffset; } KafkaWritable(int partition, long timestamp, byte[] value, @Nullable byte[] recordKey) { - this(partition, -1, timestamp, value, -1, -1, recordKey); + this(partition, -1, timestamp, value, recordKey); } @SuppressWarnings("WeakerAccess") public KafkaWritable() { @@ -92,8 +72,6 @@ void set(ConsumerRecord consumerRecord, long startOffset, long e dataOutput.writeLong(timestamp); dataOutput.writeInt(partition); dataOutput.writeLong(offset); - dataOutput.writeLong(startOffset); - dataOutput.writeLong(endOffset); dataOutput.writeInt(value.length); dataOutput.write(value); if (recordKey != null) { @@ -108,8 +86,6 @@ void set(ConsumerRecord consumerRecord, long startOffset, long e timestamp = dataInput.readLong(); partition = dataInput.readInt(); offset = dataInput.readLong(); - startOffset = dataInput.readLong(); - endOffset = dataInput.readLong(); int dataSize = dataInput.readInt(); if (dataSize > 0) { value = new byte[dataSize]; @@ -142,14 +118,6 @@ long getTimestamp() { return value; } - @SuppressWarnings("WeakerAccess") long getStartOffset() { - return startOffset; - } - - @SuppressWarnings("WeakerAccess") long getEndOffset() { - return endOffset; - } - @Nullable byte[] getRecordKey() { return recordKey; } @@ -164,15 +132,13 @@ long getTimestamp() { KafkaWritable writable = (KafkaWritable) o; return partition == writable.partition && offset == writable.offset - && startOffset == writable.startOffset - && endOffset == writable.endOffset && timestamp == writable.timestamp && Arrays.equals(value, writable.value) && Arrays.equals(recordKey, writable.recordKey); } @Override public int hashCode() { - int result = Objects.hash(partition, offset, startOffset, endOffset, timestamp); + int result = Objects.hash(partition, offset, timestamp); result = 31 * result + Arrays.hashCode(value); result = 31 * result + Arrays.hashCode(recordKey); return result; @@ -184,10 +150,6 @@ long getTimestamp() { + partition + ", offset=" + offset - + ", startOffset=" - + startOffset - + ", endOffset=" - + endOffset + ", timestamp=" + timestamp + ", value=" @@ -207,10 +169,6 @@ Writable getHiveWritable(MetadataColumn metadataColumn) { return new LongWritable(getTimestamp()); case KEY: return getRecordKey() == null ? null : new BytesWritable(getRecordKey()); - case START_OFFSET: - return new LongWritable(getStartOffset()); - case END_OFFSET: - return new LongWritable(getEndOffset()); default: throw new IllegalArgumentException("Unknown metadata column [" + metadataColumn.getName() + "]"); } diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/MetadataColumn.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/MetadataColumn.java index 60e1aea55d..15d5340c5c 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/MetadataColumn.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/MetadataColumn.java @@ -60,24 +60,14 @@ /** * Record Timestamp column name, added as extra meta column of type long. */ - TIMESTAMP("__timestamp", TypeInfoFactory.longTypeInfo), - /** - * Start offset given by the input split, this will reflect the actual start of TP or start given by split pruner. - */ - // @TODO To be removed next PR it is here to make review easy - START_OFFSET("__start_offset", TypeInfoFactory.longTypeInfo), - /** - * End offset given by input split at run time. - */ - // @TODO To be removed next PR it is here to make review easy - END_OFFSET("__end_offset", TypeInfoFactory.longTypeInfo); + TIMESTAMP("__timestamp", TypeInfoFactory.longTypeInfo); /** * Kafka metadata columns list that indicates the order of appearance for each column in final row. */ private static final List KAFKA_METADATA_COLUMNS = - Arrays.asList(KEY, PARTITION, OFFSET, TIMESTAMP, START_OFFSET, END_OFFSET); + Arrays.asList(KEY, PARTITION, OFFSET, TIMESTAMP); static final List KAFKA_METADATA_INSPECTORS = diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java index c95bdb02de..678e190b3f 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java @@ -63,7 +63,7 @@ private final String topic; private final String writerId; - private final KafkaOutputFormat.WriteSemantic writeSemantic; + private final KafkaOutputFormat.WriteSemantic writeSemantic = KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE;; private final KafkaProducer producer; private final Callback callback; private final AtomicReference sendExceptionRef = new AtomicReference<>(); @@ -73,12 +73,9 @@ /** * @param topic Kafka Topic. * @param writerId Writer Id use for logging. - * @param atLeastOnce true if the desired delivery semantic is at least once. * @param properties Kafka Producer properties. */ - SimpleKafkaWriter(String topic, @Nullable String writerId, boolean atLeastOnce, Properties properties) { - this.writeSemantic = - atLeastOnce ? KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE : KafkaOutputFormat.WriteSemantic.BEST_EFFORT; + SimpleKafkaWriter(String topic, @Nullable String writerId, Properties properties) { this.writerId = writerId == null ? UUID.randomUUID().toString() : writerId; this.topic = Preconditions.checkNotNull(topic, "Topic can not be null"); Preconditions.checkState(properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) != null, @@ -88,21 +85,13 @@ this.callback = (metadata, exception) -> { if (exception != null) { lostRecords.getAndIncrement(); - switch (writeSemantic) { - case BEST_EFFORT: - LOG.warn(ACTION_CARRY_ON, getWriterId(), topic, writeSemantic); - break; - case AT_LEAST_ONCE: - LOG.error(ACTION_ABORT, getWriterId(), topic, writeSemantic, exception.getMessage()); - sendExceptionRef.compareAndSet(null, exception); - break; - default: - throw new IllegalArgumentException("Unsupported delivery semantic " + writeSemantic); - } + LOG.error(ACTION_ABORT, getWriterId(), topic, writeSemantic, exception.getMessage()); + sendExceptionRef.compareAndSet(null, exception); } }; LOG.info("Starting WriterId [{}], Delivery Semantic [{}], Target Kafka Topic [{}]", - writerId, writeSemantic, + writerId, + writeSemantic, topic); } @@ -126,12 +115,8 @@ private void handleKafkaException(KafkaException kafkaException) { LOG.error(String.format(ABORT_MSG, writerId, kafkaException.getMessage(), topic, -1L)); sendExceptionRef.compareAndSet(null, kafkaException); } else { - if (writeSemantic == KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE) { - LOG.error(ACTION_ABORT, writerId, topic, writeSemantic, kafkaException.getMessage()); - sendExceptionRef.compareAndSet(null, kafkaException); - } else { - LOG.warn(ACTION_CARRY_ON, writerId, topic, writeSemantic); - } + LOG.error(ACTION_ABORT, writerId, topic, writeSemantic, kafkaException.getMessage()); + sendExceptionRef.compareAndSet(null, kafkaException); } } diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java index 3d3f598bc0..ff345f9196 100644 --- kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java +++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java @@ -166,10 +166,7 @@ public KafkaRecordIteratorTest(String currentTopic, .map((consumerRecord) -> new KafkaWritable(consumerRecord.partition(), consumerRecord.offset(), consumerRecord.timestamp(), - consumerRecord.value(), - 50L, - 100L, - consumerRecord.key())) + consumerRecord.value(), consumerRecord.key())) .collect(Collectors.toList()); KafkaRecordReader recordReader = new KafkaRecordReader(); TaskAttemptContext context = new TaskAttemptContextImpl(this.conf, new TaskAttemptID()); diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java index 8aebb9254e..640b24e82d 100644 --- kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java +++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java @@ -67,9 +67,7 @@ public KafkaUtilsTest() { @Test public void testMetadataEnumLookupMapper() { int partition = 1; long offset = 5L; - long ts = System.currentTimeMillis(); - long startOffset = 0L; - long endOffset = 200L; + long ts = 1000000L; byte[] value = "value".getBytes(); byte[] key = "key".getBytes(); // ORDER MATTERS here. @@ -78,10 +76,8 @@ public KafkaUtilsTest() { Arrays.asList(new BytesWritable(key), new IntWritable(partition), new LongWritable(offset), - new LongWritable(ts), - new LongWritable(startOffset), - new LongWritable(endOffset)); - KafkaWritable kafkaWritable = new KafkaWritable(partition, offset, ts, value, startOffset, endOffset, key); + new LongWritable(ts)); + KafkaWritable kafkaWritable = new KafkaWritable(partition, offset, ts, value, key); List actual = diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaWritableTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaWritableTest.java index 45bf7912c4..73d185e506 100644 --- kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaWritableTest.java +++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaWritableTest.java @@ -42,10 +42,7 @@ public KafkaWritableTest() { new KafkaWritable(record.partition(), record.offset(), record.timestamp(), - record.value(), - 0L, - 100L, - null); + record.value(), null); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream w = new DataOutputStream(baos); kafkaWritable.write(w); @@ -65,10 +62,7 @@ public KafkaWritableTest() { new KafkaWritable(record.partition(), record.offset(), record.timestamp(), - record.value(), - 0L, - 100L, - "thisKey".getBytes()); + record.value(), "thisKey".getBytes()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream w = new DataOutputStream(baos); kafkaWritable.write(w); @@ -85,10 +79,7 @@ public KafkaWritableTest() { KafkaWritable kafkaWritable = new KafkaWritable(5, 1000L, 1L, - "value".getBytes(), - 0L, - 10000L, - "key".getBytes()); + "value".getBytes(), "key".getBytes()); Arrays.stream(MetadataColumn.values()).forEach(kafkaWritable::getHiveWritable); } } diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/SimpleKafkaWriterTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/SimpleKafkaWriterTest.java index d8168e02a0..8a9bbc7f66 100644 --- kafka-handler/src/test/org/apache/hadoop/hive/kafka/SimpleKafkaWriterTest.java +++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/SimpleKafkaWriterTest.java @@ -71,8 +71,7 @@ public SimpleKafkaWriterTest(KafkaOutputFormat.WriteSemantic writeSemantic) { } @Parameterized.Parameters public static Iterable data() { - return Arrays.asList(new Object[][] {{KafkaOutputFormat.WriteSemantic.BEST_EFFORT}, - {KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE}}); + return Arrays.asList(new Object[][] {{KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE}}); } @BeforeClass public static void setupCluster() throws Throwable { @@ -110,9 +109,7 @@ private void setupConsumer() { @Test(expected = IllegalStateException.class) public void testMissingBrokerString() { new SimpleKafkaWriter("t", - null, - writeSemantic.equals(KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE), - new Properties()); + null, new Properties()); } @Test public void testCheckWriterId() { @@ -121,9 +118,7 @@ private void setupConsumer() { SimpleKafkaWriter writer = new SimpleKafkaWriter("t", - null, - writeSemantic.equals(KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE), - properties); + null, properties); Assert.assertNotNull(writer.getWriterId()); } @@ -135,12 +130,7 @@ private void setupConsumer() { properties.setProperty("metadata.max.age.ms", "100"); properties.setProperty("max.block.ms", "1000"); KafkaWritable record = new KafkaWritable(-1, -1, "value".getBytes(), null); - SimpleKafkaWriter writer = new SimpleKafkaWriter("t", null, false, properties); - writer.write(record); - writer.close(false); - Assert.assertEquals("Expect sent records not matching", 1, writer.getSentRecords()); - Assert.assertEquals("Expect lost records is not matching", 1, writer.getLostRecords()); - writer = new SimpleKafkaWriter("t", null, true, properties); + SimpleKafkaWriter writer = new SimpleKafkaWriter("t", null, properties); Exception exception = null; try { writer.write(record); @@ -160,9 +150,7 @@ private void setupConsumer() { SimpleKafkaWriter writer = new SimpleKafkaWriter(topic, - null, - writeSemantic.equals(KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE), - properties); + null, properties); RECORDS_WRITABLES.forEach(kafkaRecordWritable -> { try { writer.write(kafkaRecordWritable); diff --git ql/src/test/queries/clientpositive/kafka_storage_handler.q ql/src/test/queries/clientpositive/kafka_storage_handler.q index 595f0320b6..e6cd276f95 100644 --- ql/src/test/queries/clientpositive/kafka_storage_handler.q +++ ql/src/test/queries/clientpositive/kafka_storage_handler.q @@ -14,32 +14,32 @@ TBLPROPERTIES DESCRIBE EXTENDED kafka_table; -Select `__partition` ,`__start_offset`,`__end_offset`, `__offset`,`__key`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +Select `__partition` , `__offset`,`__key`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , `unpatrolled` , `anonymous` , `robot` , added , deleted , delta FROM kafka_table; Select count(*) FROM kafka_table; -Select `__partition`, `__offset`,`__start_offset`,`__end_offset`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +Select `__partition`, `__offset`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , `unpatrolled` , `anonymous` , `robot` , added , deleted , delta from kafka_table where `__timestamp` > 1533960760123; -Select `__partition`, `__offset` ,`__start_offset`,`__end_offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +Select `__partition`, `__offset` ,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , `unpatrolled` , `anonymous` , `robot` , added , deleted , delta from kafka_table where `__timestamp` > 533960760123; -Select `__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +Select `__partition`, `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , `unpatrolled` , `anonymous` , `robot` , added , deleted , delta from kafka_table where (`__offset` > 7 and `__partition` = 0 and `__offset` <9 ) OR `__offset` = 4 and `__partition` = 0 OR (`__offset` <= 1 and `__partition` = 0 and `__offset` > 0); -Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5; +Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5; -Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5; +Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5; -Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5; +Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5; -- Timestamp filter -Select `__partition`,`__start_offset`,`__end_offset`, `__offset`, `user` from kafka_table where +Select `__partition`, `__offset`, `user` from kafka_table where `__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '1' HOURS) ; -- non existing partition @@ -232,7 +232,8 @@ TBLPROPERTIES describe extended wiki_kafka_avro_table; -select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table; +select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, + `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table; select count(*) from wiki_kafka_avro_table; @@ -241,7 +242,7 @@ select count(distinct `user`) from wiki_kafka_avro_table; select sum(deltabucket), min(commentlength) from wiki_kafka_avro_table; select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__timestamp` as kafka_record_ts_long, -`__partition`, `__start_offset`,`__end_offset`, `__key`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, +`__partition`, `__key`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table where `__timestamp` > 1534750625090; @@ -255,11 +256,11 @@ TBLPROPERTIES "kafka.serde.class"="org.apache.hadoop.hive.serde2.JsonSerDe") ; -insert into table kafka_table_insert (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`, `__start_offset`, `__end_offset`) -values ('test1',5, 4.999,'key',null ,-1,1536449552290,-1,null ); +insert into table kafka_table_insert (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`) +values ('test1',5, 4.999,'key',null ,-1,1536449552290); -insert into table kafka_table_insert (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`, `__start_offset`, `__end_offset`) -values ('test2',15, 14.9996666, null ,null ,-1,1536449552285,-1,-1 ); +insert into table kafka_table_insert (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`) +values ('test2',15, 14.9996666, null ,null ,-1,1536449552285); select * from kafka_table_insert; @@ -268,10 +269,11 @@ insert into table wiki_kafka_avro_table select isrobot as isrobot, channel as channel,`timestamp` as `timestamp`, flags as flags, isunpatrolled as isunpatrolled, page as page, diffurl as diffurl, added as added, comment as comment, commentlength as commentlength, isnew as isnew, isminor as isminor, delta as delta, isanonymous as isanonymous, `user` as `user`, deltabucket as detlabucket, deleted as deleted, namespace as namespace, -`__key`, `__partition`, -1 as `__offset`,`__timestamp`, -1 as `__start_offset`, -1 as `__end_offset` +`__key`, `__partition`, -1 as `__offset`,`__timestamp` from wiki_kafka_avro_table; -select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table; +select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, +`page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table; select `__key`, count(1) FROM wiki_kafka_avro_table group by `__key` order by `__key`; @@ -287,12 +289,12 @@ TBLPROPERTIES "kafka.serde.class"="org.apache.hadoop.hive.serde2.OpenCSVSerde"); ALTER TABLE kafka_table_csv SET TBLPROPERTIES ("hive.kafka.optimistic.commit"="false", "kafka.write.semantic"="EXACTLY_ONCE"); -insert into table kafka_table_csv select c_name, c_int, c_float, `__key`, `__partition`, -1 as `__offset`, `__timestamp`, -1, -1 from kafka_table_insert; +insert into table kafka_table_csv select c_name, c_int, c_float, `__key`, `__partition`, -1 as `__offset`, `__timestamp` from kafka_table_insert; -insert into table kafka_table_csv (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`, `__start_offset`, `__end_offset`) -values ('test4',-5, -4.999,'key-2',null ,-1,1536449552291,-1,null ); +insert into table kafka_table_csv (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`) +values ('test4',-5, -4.999,'key-2',null ,-1,1536449552291); -insert into table kafka_table_csv (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`, `__start_offset`, `__end_offset`) -values ('test5',-15, -14.9996666, 'key-3' ,null ,-1,1536449552284,-1,-1 ); +insert into table kafka_table_csv (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`) +values ('test5',-15, -14.9996666, 'key-3' ,null ,-1,1536449552284); select * from kafka_table_csv; \ No newline at end of file diff --git ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out index 73f0f293d9..8ea2aa9d3a 100644 --- ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out +++ ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out @@ -48,32 +48,30 @@ __key binary from deserializer __partition int from deserializer __offset bigint from deserializer __timestamp bigint from deserializer -__start_offset bigint from deserializer -__end_offset bigint from deserializer #### A masked pattern was here #### StorageHandlerInfo Partition(topic = test-topic, partition = 0, leader = 1, replicas = [1], isr = [1], offlineReplicas = []) [start offset = [0], end offset = [10]] -PREHOOK: query: Select `__partition` ,`__start_offset`,`__end_offset`, `__offset`,`__key`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +PREHOOK: query: Select `__partition` , `__offset`,`__key`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , `unpatrolled` , `anonymous` , `robot` , added , deleted , delta FROM kafka_table PREHOOK: type: QUERY PREHOOK: Input: default@kafka_table PREHOOK: Output: hdfs://### HDFS PATH ### -POSTHOOK: query: Select `__partition` ,`__start_offset`,`__end_offset`, `__offset`,`__key`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +POSTHOOK: query: Select `__partition` , `__offset`,`__key`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , `unpatrolled` , `anonymous` , `robot` , added , deleted , delta FROM kafka_table POSTHOOK: type: QUERY POSTHOOK: Input: default@kafka_table POSTHOOK: Output: hdfs://### HDFS PATH ### -0 0 10 0 key NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 -0 0 10 1 key NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 -0 0 10 2 key NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 -0 0 10 3 key NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 -0 0 10 4 key NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 -0 0 10 5 key NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 -0 0 10 6 key NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 -0 0 10 7 key NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 -0 0 10 8 key NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 -0 0 10 9 key NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +0 0 key NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 1 key NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 2 key NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 3 key NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 4 key NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +0 5 key NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 6 key NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 7 key NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 8 key NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 9 key NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 PREHOOK: query: Select count(*) FROM kafka_table PREHOOK: type: QUERY PREHOOK: Input: default@kafka_table @@ -83,121 +81,121 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@kafka_table POSTHOOK: Output: hdfs://### HDFS PATH ### 10 -PREHOOK: query: Select `__partition`, `__offset`,`__start_offset`,`__end_offset`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +PREHOOK: query: Select `__partition`, `__offset`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , `unpatrolled` , `anonymous` , `robot` , added , deleted , delta from kafka_table where `__timestamp` > 1533960760123 PREHOOK: type: QUERY PREHOOK: Input: default@kafka_table PREHOOK: Output: hdfs://### HDFS PATH ### -POSTHOOK: query: Select `__partition`, `__offset`,`__start_offset`,`__end_offset`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +POSTHOOK: query: Select `__partition`, `__offset`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , `unpatrolled` , `anonymous` , `robot` , added , deleted , delta from kafka_table where `__timestamp` > 1533960760123 POSTHOOK: type: QUERY POSTHOOK: Input: default@kafka_table POSTHOOK: Output: hdfs://### HDFS PATH ### -0 0 0 10 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 -0 1 0 10 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 -0 2 0 10 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 -0 3 0 10 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 -0 4 0 10 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 -0 5 0 10 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 -0 6 0 10 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 -0 7 0 10 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 -0 8 0 10 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 -0 9 0 10 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 -PREHOOK: query: Select `__partition`, `__offset` ,`__start_offset`,`__end_offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +0 0 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 1 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 2 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 3 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 4 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +0 5 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 6 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 7 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 8 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 9 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +PREHOOK: query: Select `__partition`, `__offset` ,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , `unpatrolled` , `anonymous` , `robot` , added , deleted , delta from kafka_table where `__timestamp` > 533960760123 PREHOOK: type: QUERY PREHOOK: Input: default@kafka_table PREHOOK: Output: hdfs://### HDFS PATH ### -POSTHOOK: query: Select `__partition`, `__offset` ,`__start_offset`,`__end_offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +POSTHOOK: query: Select `__partition`, `__offset` ,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , `unpatrolled` , `anonymous` , `robot` , added , deleted , delta from kafka_table where `__timestamp` > 533960760123 POSTHOOK: type: QUERY POSTHOOK: Input: default@kafka_table POSTHOOK: Output: hdfs://### HDFS PATH ### -0 0 0 10 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 -0 1 0 10 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 -0 2 0 10 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 -0 3 0 10 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 -0 4 0 10 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 -0 5 0 10 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 -0 6 0 10 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 -0 7 0 10 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 -0 8 0 10 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 -0 9 0 10 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 -PREHOOK: query: Select `__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +0 0 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 1 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 2 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 3 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 4 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +0 5 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 6 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 7 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 8 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 9 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +PREHOOK: query: Select `__partition`, `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , `unpatrolled` , `anonymous` , `robot` , added , deleted , delta from kafka_table where (`__offset` > 7 and `__partition` = 0 and `__offset` <9 ) OR `__offset` = 4 and `__partition` = 0 OR (`__offset` <= 1 and `__partition` = 0 and `__offset` > 0) PREHOOK: type: QUERY PREHOOK: Input: default@kafka_table PREHOOK: Output: hdfs://### HDFS PATH ### -POSTHOOK: query: Select `__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +POSTHOOK: query: Select `__partition`, `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , `unpatrolled` , `anonymous` , `robot` , added , deleted , delta from kafka_table where (`__offset` > 7 and `__partition` = 0 and `__offset` <9 ) OR `__offset` = 4 and `__partition` = 0 OR (`__offset` <= 1 and `__partition` = 0 and `__offset` > 0) POSTHOOK: type: QUERY POSTHOOK: Input: default@kafka_table POSTHOOK: Output: hdfs://### HDFS PATH ### -0 1 9 1 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 -0 1 9 4 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 -0 1 9 8 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 -PREHOOK: query: Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5 +0 1 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 4 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +0 8 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +PREHOOK: query: Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5 PREHOOK: type: QUERY PREHOOK: Input: default@kafka_table PREHOOK: Output: hdfs://### HDFS PATH ### -POSTHOOK: query: Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5 +POSTHOOK: query: Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5 POSTHOOK: type: QUERY POSTHOOK: Input: default@kafka_table POSTHOOK: Output: hdfs://### HDFS PATH ### -key 0 5 6 5 NULL Gypsy Danger nuclear -PREHOOK: query: Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5 +key 0 5 NULL Gypsy Danger nuclear +PREHOOK: query: Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5 PREHOOK: type: QUERY PREHOOK: Input: default@kafka_table PREHOOK: Output: hdfs://### HDFS PATH ### -POSTHOOK: query: Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5 +POSTHOOK: query: Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5 POSTHOOK: type: QUERY POSTHOOK: Input: default@kafka_table POSTHOOK: Output: hdfs://### HDFS PATH ### -key 0 0 5 0 NULL Gypsy Danger nuclear -key 0 0 5 1 NULL Striker Eureka speed -key 0 0 5 2 NULL Cherno Alpha masterYi -key 0 0 5 3 NULL Crimson Typhoon triplets -key 0 0 5 4 NULL Coyote Tango stringer -PREHOOK: query: Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5 +key 0 0 NULL Gypsy Danger nuclear +key 0 1 NULL Striker Eureka speed +key 0 2 NULL Cherno Alpha masterYi +key 0 3 NULL Crimson Typhoon triplets +key 0 4 NULL Coyote Tango stringer +PREHOOK: query: Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5 PREHOOK: type: QUERY PREHOOK: Input: default@kafka_table PREHOOK: Output: hdfs://### HDFS PATH ### -POSTHOOK: query: Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5 +POSTHOOK: query: Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5 POSTHOOK: type: QUERY POSTHOOK: Input: default@kafka_table POSTHOOK: Output: hdfs://### HDFS PATH ### -key 0 6 10 6 NULL Striker Eureka speed -key 0 6 10 7 NULL Cherno Alpha masterYi -key 0 6 10 8 NULL Crimson Typhoon triplets -key 0 6 10 9 NULL Coyote Tango stringer -PREHOOK: query: Select `__partition`,`__start_offset`,`__end_offset`, `__offset`, `user` from kafka_table where +key 0 6 NULL Striker Eureka speed +key 0 7 NULL Cherno Alpha masterYi +key 0 8 NULL Crimson Typhoon triplets +key 0 9 NULL Coyote Tango stringer +PREHOOK: query: Select `__partition`, `__offset`, `user` from kafka_table where `__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '1' HOURS) PREHOOK: type: QUERY PREHOOK: Input: default@kafka_table PREHOOK: Output: hdfs://### HDFS PATH ### -POSTHOOK: query: Select `__partition`,`__start_offset`,`__end_offset`, `__offset`, `user` from kafka_table where +POSTHOOK: query: Select `__partition`, `__offset`, `user` from kafka_table where `__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '1' HOURS) POSTHOOK: type: QUERY POSTHOOK: Input: default@kafka_table POSTHOOK: Output: hdfs://### HDFS PATH ### -0 0 10 0 nuclear -0 0 10 1 speed -0 0 10 2 masterYi -0 0 10 3 triplets -0 0 10 4 stringer -0 0 10 5 nuclear -0 0 10 6 speed -0 0 10 7 masterYi -0 0 10 8 triplets -0 0 10 9 stringer +0 0 nuclear +0 1 speed +0 2 masterYi +0 3 triplets +0 4 stringer +0 5 nuclear +0 6 speed +0 7 masterYi +0 8 triplets +0 9 stringer PREHOOK: query: Select count(*) from kafka_table where `__partition` = 1 PREHOOK: type: QUERY PREHOOK: Input: default@kafka_table @@ -635,17 +633,17 @@ POSTHOOK: query: SELECT * FROM wiki_kafka_avro_table_1 POSTHOOK: type: QUERY POSTHOOK: Input: default@wiki_kafka_avro_table_1 POSTHOOK: Output: hdfs://### HDFS PATH ### -key-0 0 0 1534736225090 0 11 -key-1 0 1 1534739825090 0 11 -key-2 0 2 1534743425090 0 11 -key-3 0 3 1534747025090 0 11 -key-4 0 4 1534750625090 0 11 -key-5 0 5 1534754225090 0 11 -key-6 0 6 1534757825090 0 11 -key-7 0 7 1534761425090 0 11 -key-8 0 8 1534765025090 0 11 -key-9 0 9 1534768625090 0 11 -key-10 0 10 1534772225090 0 11 +key-0 0 0 1534736225090 +key-1 0 1 1534739825090 +key-2 0 2 1534743425090 +key-3 0 3 1534747025090 +key-4 0 4 1534750625090 +key-5 0 5 1534754225090 +key-6 0 6 1534757825090 +key-7 0 7 1534761425090 +key-8 0 8 1534765025090 +key-9 0 9 1534768625090 +key-10 0 10 1534772225090 PREHOOK: query: SELECT COUNT (*) from wiki_kafka_avro_table_1 PREHOOK: type: QUERY PREHOOK: Input: default@wiki_kafka_avro_table_1 @@ -825,17 +823,17 @@ __key binary from deserializer __partition int from deserializer __offset bigint from deserializer __timestamp bigint from deserializer -__start_offset bigint from deserializer -__end_offset bigint from deserializer #### A masked pattern was here #### StorageHandlerInfo Partition(topic = wiki_kafka_avro_table, partition = 0, leader = 1, replicas = [1], isr = [1], offlineReplicas = []) [start offset = [0], end offset = [11]] -PREHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table +PREHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, + `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table PREHOOK: type: QUERY PREHOOK: Input: default@wiki_kafka_avro_table PREHOOK: Output: hdfs://### HDFS PATH ### -POSTHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table +POSTHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, + `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table POSTHOOK: type: QUERY POSTHOOK: Input: default@wiki_kafka_avro_table POSTHOOK: Output: hdfs://### HDFS PATH ### @@ -878,23 +876,23 @@ POSTHOOK: Input: default@wiki_kafka_avro_table POSTHOOK: Output: hdfs://### HDFS PATH ### 5522.000000000001 0 PREHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__timestamp` as kafka_record_ts_long, -`__partition`, `__start_offset`,`__end_offset`, `__key`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, +`__partition`, `__key`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table where `__timestamp` > 1534750625090 PREHOOK: type: QUERY PREHOOK: Input: default@wiki_kafka_avro_table PREHOOK: Output: hdfs://### HDFS PATH ### POSTHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__timestamp` as kafka_record_ts_long, -`__partition`, `__start_offset`,`__end_offset`, `__key`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, +`__partition`, `__key`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table where `__timestamp` > 1534750625090 POSTHOOK: type: QUERY POSTHOOK: Input: default@wiki_kafka_avro_table POSTHOOK: Output: hdfs://### HDFS PATH ### -2018-08-20 08:37:05.09 1534754225090 0 5 11 key-5 5 08/20/2018 01:37:05 test-user-5 page is 500 -5 502.0 true 5 -2018-08-20 09:37:05.09 1534757825090 0 5 11 key-6 6 08/20/2018 02:37:05 test-user-6 page is 600 -6 602.4000000000001 false 6 -2018-08-20 10:37:05.09 1534761425090 0 5 11 key-7 7 08/20/2018 03:37:05 test-user-7 page is 700 -7 702.8000000000001 true 7 -2018-08-20 11:37:05.09 1534765025090 0 5 11 key-8 8 08/20/2018 04:37:05 test-user-8 page is 800 -8 803.2 true 8 -2018-08-20 12:37:05.09 1534768625090 0 5 11 key-9 9 08/20/2018 05:37:05 test-user-9 page is 900 -9 903.6 false 9 -2018-08-20 13:37:05.09 1534772225090 0 5 11 key-10 10 08/20/2018 06:37:05 test-user-10 page is 1000 -10 1004.0 true 10 +2018-08-20 08:37:05.09 1534754225090 0 key-5 5 08/20/2018 01:37:05 test-user-5 page is 500 -5 502.0 true 5 +2018-08-20 09:37:05.09 1534757825090 0 key-6 6 08/20/2018 02:37:05 test-user-6 page is 600 -6 602.4000000000001 false 6 +2018-08-20 10:37:05.09 1534761425090 0 key-7 7 08/20/2018 03:37:05 test-user-7 page is 700 -7 702.8000000000001 true 7 +2018-08-20 11:37:05.09 1534765025090 0 key-8 8 08/20/2018 04:37:05 test-user-8 page is 800 -8 803.2 true 8 +2018-08-20 12:37:05.09 1534768625090 0 key-9 9 08/20/2018 05:37:05 test-user-9 page is 900 -9 903.6 false 9 +2018-08-20 13:37:05.09 1534772225090 0 key-10 10 08/20/2018 06:37:05 test-user-10 page is 1000 -10 1004.0 true 10 PREHOOK: query: CREATE EXTERNAL TABLE kafka_table_insert (c_name string, c_int int, c_float float) STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' @@ -917,23 +915,23 @@ TBLPROPERTIES POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@kafka_table_insert -PREHOOK: query: insert into table kafka_table_insert (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`, `__start_offset`, `__end_offset`) -values ('test1',5, 4.999,'key',null ,-1,1536449552290,-1,null ) +PREHOOK: query: insert into table kafka_table_insert (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`) +values ('test1',5, 4.999,'key',null ,-1,1536449552290) PREHOOK: type: QUERY PREHOOK: Input: _dummy_database@_dummy_table PREHOOK: Output: default@kafka_table_insert -POSTHOOK: query: insert into table kafka_table_insert (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`, `__start_offset`, `__end_offset`) -values ('test1',5, 4.999,'key',null ,-1,1536449552290,-1,null ) +POSTHOOK: query: insert into table kafka_table_insert (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`) +values ('test1',5, 4.999,'key',null ,-1,1536449552290) POSTHOOK: type: QUERY POSTHOOK: Input: _dummy_database@_dummy_table POSTHOOK: Output: default@kafka_table_insert -PREHOOK: query: insert into table kafka_table_insert (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`, `__start_offset`, `__end_offset`) -values ('test2',15, 14.9996666, null ,null ,-1,1536449552285,-1,-1 ) +PREHOOK: query: insert into table kafka_table_insert (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`) +values ('test2',15, 14.9996666, null ,null ,-1,1536449552285) PREHOOK: type: QUERY PREHOOK: Input: _dummy_database@_dummy_table PREHOOK: Output: default@kafka_table_insert -POSTHOOK: query: insert into table kafka_table_insert (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`, `__start_offset`, `__end_offset`) -values ('test2',15, 14.9996666, null ,null ,-1,1536449552285,-1,-1 ) +POSTHOOK: query: insert into table kafka_table_insert (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`) +values ('test2',15, 14.9996666, null ,null ,-1,1536449552285) POSTHOOK: type: QUERY POSTHOOK: Input: _dummy_database@_dummy_table POSTHOOK: Output: default@kafka_table_insert @@ -945,13 +943,13 @@ POSTHOOK: query: select * from kafka_table_insert POSTHOOK: type: QUERY POSTHOOK: Input: default@kafka_table_insert POSTHOOK: Output: hdfs://### HDFS PATH ### -test1 5 4.999 key 0 0 1536449552290 0 2 -test2 15 14.999666 NULL 0 1 1536449552285 0 2 +test1 5 4.999 key 0 0 1536449552290 +test2 15 14.999666 NULL 0 1 1536449552285 PREHOOK: query: insert into table wiki_kafka_avro_table select isrobot as isrobot, channel as channel,`timestamp` as `timestamp`, flags as flags, isunpatrolled as isunpatrolled, page as page, diffurl as diffurl, added as added, comment as comment, commentlength as commentlength, isnew as isnew, isminor as isminor, delta as delta, isanonymous as isanonymous, `user` as `user`, deltabucket as detlabucket, deleted as deleted, namespace as namespace, -`__key`, `__partition`, -1 as `__offset`,`__timestamp`, -1 as `__start_offset`, -1 as `__end_offset` +`__key`, `__partition`, -1 as `__offset`,`__timestamp` from wiki_kafka_avro_table PREHOOK: type: QUERY PREHOOK: Input: default@wiki_kafka_avro_table @@ -960,16 +958,18 @@ POSTHOOK: query: insert into table wiki_kafka_avro_table select isrobot as isrobot, channel as channel,`timestamp` as `timestamp`, flags as flags, isunpatrolled as isunpatrolled, page as page, diffurl as diffurl, added as added, comment as comment, commentlength as commentlength, isnew as isnew, isminor as isminor, delta as delta, isanonymous as isanonymous, `user` as `user`, deltabucket as detlabucket, deleted as deleted, namespace as namespace, -`__key`, `__partition`, -1 as `__offset`,`__timestamp`, -1 as `__start_offset`, -1 as `__end_offset` +`__key`, `__partition`, -1 as `__offset`,`__timestamp` from wiki_kafka_avro_table POSTHOOK: type: QUERY POSTHOOK: Input: default@wiki_kafka_avro_table POSTHOOK: Output: default@wiki_kafka_avro_table -PREHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table +PREHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, +`page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table PREHOOK: type: QUERY PREHOOK: Input: default@wiki_kafka_avro_table PREHOOK: Output: hdfs://### HDFS PATH ### -POSTHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table +POSTHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, +`page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table POSTHOOK: type: QUERY POSTHOOK: Input: default@wiki_kafka_avro_table POSTHOOK: Output: hdfs://### HDFS PATH ### @@ -1061,31 +1061,31 @@ POSTHOOK: query: ALTER TABLE kafka_table_csv SET TBLPROPERTIES ("hive.kafka.opti POSTHOOK: type: ALTERTABLE_PROPERTIES POSTHOOK: Input: default@kafka_table_csv POSTHOOK: Output: default@kafka_table_csv -PREHOOK: query: insert into table kafka_table_csv select c_name, c_int, c_float, `__key`, `__partition`, -1 as `__offset`, `__timestamp`, -1, -1 from kafka_table_insert +PREHOOK: query: insert into table kafka_table_csv select c_name, c_int, c_float, `__key`, `__partition`, -1 as `__offset`, `__timestamp` from kafka_table_insert PREHOOK: type: QUERY PREHOOK: Input: default@kafka_table_insert PREHOOK: Output: default@kafka_table_csv -POSTHOOK: query: insert into table kafka_table_csv select c_name, c_int, c_float, `__key`, `__partition`, -1 as `__offset`, `__timestamp`, -1, -1 from kafka_table_insert +POSTHOOK: query: insert into table kafka_table_csv select c_name, c_int, c_float, `__key`, `__partition`, -1 as `__offset`, `__timestamp` from kafka_table_insert POSTHOOK: type: QUERY POSTHOOK: Input: default@kafka_table_insert POSTHOOK: Output: default@kafka_table_csv -PREHOOK: query: insert into table kafka_table_csv (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`, `__start_offset`, `__end_offset`) -values ('test4',-5, -4.999,'key-2',null ,-1,1536449552291,-1,null ) +PREHOOK: query: insert into table kafka_table_csv (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`) +values ('test4',-5, -4.999,'key-2',null ,-1,1536449552291) PREHOOK: type: QUERY PREHOOK: Input: _dummy_database@_dummy_table PREHOOK: Output: default@kafka_table_csv -POSTHOOK: query: insert into table kafka_table_csv (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`, `__start_offset`, `__end_offset`) -values ('test4',-5, -4.999,'key-2',null ,-1,1536449552291,-1,null ) +POSTHOOK: query: insert into table kafka_table_csv (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`) +values ('test4',-5, -4.999,'key-2',null ,-1,1536449552291) POSTHOOK: type: QUERY POSTHOOK: Input: _dummy_database@_dummy_table POSTHOOK: Output: default@kafka_table_csv -PREHOOK: query: insert into table kafka_table_csv (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`, `__start_offset`, `__end_offset`) -values ('test5',-15, -14.9996666, 'key-3' ,null ,-1,1536449552284,-1,-1 ) +PREHOOK: query: insert into table kafka_table_csv (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`) +values ('test5',-15, -14.9996666, 'key-3' ,null ,-1,1536449552284) PREHOOK: type: QUERY PREHOOK: Input: _dummy_database@_dummy_table PREHOOK: Output: default@kafka_table_csv -POSTHOOK: query: insert into table kafka_table_csv (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`, `__start_offset`, `__end_offset`) -values ('test5',-15, -14.9996666, 'key-3' ,null ,-1,1536449552284,-1,-1 ) +POSTHOOK: query: insert into table kafka_table_csv (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`) +values ('test5',-15, -14.9996666, 'key-3' ,null ,-1,1536449552284) POSTHOOK: type: QUERY POSTHOOK: Input: _dummy_database@_dummy_table POSTHOOK: Output: default@kafka_table_csv @@ -1097,7 +1097,7 @@ POSTHOOK: query: select * from kafka_table_csv POSTHOOK: type: QUERY POSTHOOK: Input: default@kafka_table_csv POSTHOOK: Output: hdfs://### HDFS PATH ### -test1 5 4.999 key 0 0 1536449552290 0 7 -test2 15 14.999666 NULL 0 1 1536449552285 0 7 -test4 -5 -4.999 key-2 0 3 1536449552291 0 7 -test5 -15 -14.9996666 key-3 0 5 1536449552284 0 7 +test1 5 4.999 key 0 0 1536449552290 +test2 15 14.999666 NULL 0 1 1536449552285 +test4 -5 -4.999 key-2 0 3 1536449552291 +test5 -15 -14.9996666 key-3 0 5 1536449552284