From dfac5b30da0c8c3e6a9af035ac0bdb4889033416 Mon Sep 17 00:00:00 2001 From: Panos Garefalakis Date: Mon, 18 May 2020 16:51:15 +0100 Subject: [PATCH] Fet rid of skipCorrupt as part of ORC read pipeline Change-Id: Ic1efd6dcffc71adfa1ac3059ceacbd3f30e6ef7e --- .../llap/io/decode/GenericColumnVectorProducer.java | 3 +-- .../hive/llap/io/decode/OrcColumnVectorProducer.java | 5 +---- .../hive/llap/io/decode/OrcEncodedDataConsumer.java | 10 +++------- 3 files changed, 5 insertions(+), 13 deletions(-) diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java index 16176929de4..1c7e537890f 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java @@ -85,8 +85,7 @@ public ReadPipeline createReadPipeline(Consumer consumer, Fil SchemaEvolutionFactory sef, InputFormat sourceInputFormat, Deserializer sourceSerDe, Reporter reporter, JobConf job, Map parts) throws IOException { cacheMetrics.incrCacheReadRequests(); - OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer( - consumer, includes, false, counters, ioMetrics); + OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, includes, counters, ioMetrics); SerDeFileMetadata fm; try { fm = new SerDeFileMetadata(sourceSerDe); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java index 17c4821ec67..50abdfd8311 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java @@ -56,7 +56,6 @@ private final LowLevelCache lowLevelCache; private final BufferUsageManager bufferManager; private final Configuration conf; - private boolean _skipCorrupt; // TODO: get rid of this private LlapDaemonCacheMetrics cacheMetrics; private LlapDaemonIOMetrics ioMetrics; // TODO: if using in multiple places, e.g. SerDe cache, pass this in. @@ -73,7 +72,6 @@ public OrcColumnVectorProducer(MetadataCache metadataCache, this.lowLevelCache = lowLevelCache; this.bufferManager = bufferManager; this.conf = conf; - this._skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf); this.cacheMetrics = cacheMetrics; this.ioMetrics = ioMetrics; this.tracePool = tracePool; @@ -90,8 +88,7 @@ public ReadPipeline createReadPipeline( InputFormat unused0, Deserializer unused1, Reporter reporter, JobConf job, Map parts) throws IOException { cacheMetrics.incrCacheReadRequests(); - OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer( - consumer, includes, _skipCorrupt, counters, ioMetrics); + OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, includes, counters, ioMetrics); OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager, metadataCache, conf, job, split, includes, sarg, edc, counters, sef, tracePool, parts); edc.init(reader, reader, reader.getTrace()); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index b697a0d573a..79dba426596 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -70,20 +70,16 @@ private ConsumerFileMetadata fileMetadata; // We assume one request is only for one file. private CompressionCodec codec; private List stripes; - private final boolean skipCorrupt; // TODO: get rid of this private SchemaEvolution evolution; private IoTrace trace; private final Includes includes; private TypeDescription[] batchSchemas; private boolean useDecimal64ColumnVectors; - public OrcEncodedDataConsumer( - Consumer consumer, Includes includes, boolean skipCorrupt, - QueryFragmentCounters counters, LlapDaemonIOMetrics ioMetrics) { + public OrcEncodedDataConsumer(Consumer consumer, Includes includes, + QueryFragmentCounters counters, LlapDaemonIOMetrics ioMetrics) { super(consumer, includes.getPhysicalColumnIds().size(), ioMetrics, counters); this.includes = includes; - // TODO: get rid of this - this.skipCorrupt = skipCorrupt; if (includes.isProbeDecodeEnabled()) { LlapIoImpl.LOG.info("OrcEncodedDataConsumer probeDecode is enabled with cacheKey {} colIndex {} and colName {}", this.includes.getProbeCacheKey(), this.includes.getProbeColIdx(), this.includes.getProbeColName()); @@ -225,7 +221,7 @@ protected void decodeBatch(OrcEncodedColumnBatch batch, private void createColumnReaders(OrcEncodedColumnBatch batch, ConsumerStripeMetadata stripeMetadata, TypeDescription fileSchema) throws IOException { TreeReaderFactory.Context context = new TreeReaderFactory.ReaderContext() - .setSchemaEvolution(evolution).skipCorrupt(skipCorrupt) + .setSchemaEvolution(evolution) .writerTimeZone(stripeMetadata.getWriterTimezone()) .fileFormat(fileMetadata == null ? null : fileMetadata.getFileVersion()) .useUTCTimestamp(true) -- 2.20.1 (Apple Git-117)