Uploaded image for project: 'Hive'
  1. Hive
  2. HIVE-15658

hive.ql.session.SessionState start() is not atomic, SessionState thread local variable can get into inconsistent state

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.1.0, 1.2.1, 2.0.0, 2.0.1
    • Fix Version/s: None
    • Component/s: API, HCatalog, Transactions
    • Labels:
      None
    • Environment:

      CDH5.8.0, Flume 1.6.0, Hive 1.1.0

      Description

      Method start() in hive.ql.session.SessionState is supposed to setup needed preconditions, like HDFS scratch directories for session.
      This happens to be not an atomic operation with setting thread local variable, which can later be obtained by calling SessionState.get().
      Therefore, even is the start() method itself fails, the SessionState.get() does not return null and further re-use of the thread which previously invoked start() may lead to obtaining SessionState object in inconsistent state.

      I have observed this using Flume Hive Sink, which uses Hive Streaming interface. When the directory /tmp/hive is not writable by session user, the start() method fails (throwing RuntimeException). If the thread is re-used (like it is in Flume), further executions work with wrongly initialized SessionState object (HDFS dirs are non-existent). In Flume, this happens to me when Flume should create partition if not exists (but the code doing this is in Hive Streaming).

      Steps to reproduce:
      0. create test spooldir and allow flume to write to it, in my case /home/ubuntu/flume_test, 775, ubuntu:flume
      1. create Flume config (see attachment)
      2. create Hive table

      create table default.flume_test (column1 string, column2 string) partitioned by (dt string) clustered by (column1) INTO 2 BUCKETS STORED AS ORC;
      

      3. start flume agent:

      bin/flume-ng agent -n a1 -c conf -f conf/flume-config.txt
      

      4. hdfs dfs -chmod 600 /tmp/hive
      5. put this file into spooldir:

      echo value1,value2 > file1
      

      Expected behavior:
      Exception regarding scratch dir permissions to be thrown repeatedly.
      example (note that the line numbers are wrong as Cloudera is cloning the source codes here https://github.com/cloudera/flume-ng/ and here https://github.com/cloudera/hive):

      2017-01-18 12:39:38,926 WARN org.apache.flume.sink.hive.HiveSink: sink_hive_1 : Failed connecting to EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', database='default', table='flume_test', partitionVals=[20170118] }
      org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', database='default', table='flume_test', partitionVals=[20170118] } 
              at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:99)
              at org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:344)
              at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:296)
              at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:254)
              at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
              at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
              at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', database='default', table='flume_test', partitionVals=[20170118] }
              at org.apache.flume.sink.hive.HiveWriter.newConnection(HiveWriter.java:380)
              at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:86)
              ... 6 more
      Caused by: java.lang.RuntimeException: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rw-------
              at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:540)
              at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.createPartitionIfNotExists(HiveEndPoint.java:358)
              at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:276)
              at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243)
              at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180)
              at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157)
              at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:110)
              at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:376)
              at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:373)
              at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)
              at java.util.concurrent.FutureTask.run(FutureTask.java:262)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
              ... 1 more
      Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rw-------
              at org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:625)
              at org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:574)
              at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:518)
              ... 13 more
      

      Actual behavior:
      Exception regarding scratch dir permissions thrown once, meaningless exceptions from code, which should be unreachable, are re-thrown again and again, obfuscating the
      source of the problem to the user.
      exceptions thrown repeatedly:

      java.lang.NullPointerException: Non-local session path expected to be non-null
              at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
              at org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:686)
              at org.apache.hadoop.hive.ql.Context.<init>(Context.java:131)
              at org.apache.hadoop.hive.ql.Context.<init>(Context.java:118)
              at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:411)
              at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:312)
              at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1201)
              at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1296)
              at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1127)
              at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1115)
              at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.runDDL(HiveEndPoint.java:404)
              at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.createPartitionIfNotExists(HiveEndPoint.java:369)
              at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:276)
              at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243)
              at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180)
              at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157)
              at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:110)
              at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:376)
              at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:373)
              at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)
              at java.util.concurrent.FutureTask.run(FutureTask.java:262)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
              at java.lang.Thread.run(Thread.java:745)
      
      2017-01-18 12:39:44,453 WARN org.apache.flume.sink.hive.HiveSink: sink_hive_1 : Failed connecting to EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', database='default', table='flume_test', partitionVals=[20170118] }
      org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', database='default', table='flume_test', partitionVals=[20170118] }
              at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:99)
              at org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:344)
              at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:296)
              at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:254)
              at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
              at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
              at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.hive.hcatalog.streaming.StreamingException: partition values=[20170118]. Unable to get path for end point: [20170118]
              at org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:162)
              at org.apache.hive.hcatalog.streaming.AbstractRecordWriter.<init>(AbstractRecordWriter.java:66)
              at org.apache.hive.hcatalog.streaming.DelimitedInputWriter.<init>(DelimitedInputWriter.java:115)
              at org.apache.flume.sink.hive.HiveDelimitedTextSerializer.createRecordWriter(HiveDelimitedTextSerializer.java:67)
              at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:89)
              ... 6 more
      Caused by: NoSuchObjectException(message:partition values=[20170118])
              at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:60283)
              at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:60251)
              at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result.read(ThriftHiveMetastore.java:60182)
              at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
              at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partition(ThriftHiveMetastore.java:1892)
              at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partition(ThriftHiveMetastore.java:1877)
              at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getPartition(HiveMetaStoreClient.java:1171)
              at org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:157)
              ... 10 more
      

      Detailed description on whats going on:
      Flume, as the Hive Streaming client, does the streaming in the HiveSink class, main part is done on line
      https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L253
      where one "Batch" is drained (batch in sense of flume batch of incoming messages from channel).
      Main for loop for batch drain is:
      https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L282
      Flume creates hive endpoint for each line it tries to insert into Hive (https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L290), not very effective, but, the .equals in HiveEndPoint is properly written, so everything works.
      Then, it creates the helper HiveWriter (https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L295), which
      is cached - one for each HiveEndPoint, if no HiveWriter for endpoint exists, it is created on line
      https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L343

      Inspecting the constructor of HiveWriter, brings us to creating new connection to Hive using the Streaming API:
      https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L86
      The connection is created in a separate thread:
      https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L376
      as the submitted Future (https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L425)
      into the thread pool callTimeoutPool (the pool comes from HiveWriter https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L493 and is of constant size 1, which seems like Flume is using 1 thread per Hive Sink to talk with Hive.

      When creating newConnection (https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L379),
      with the request of autoCreatePartitions=true, the HiveEndPoint, the entry point to Hive Streaming is called : https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L105
      As I was testing non-authenticated, it boils to https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L192
      and finally to https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L215

      Constructor for inner private class ConnectionImpl then tries to create partition if it not exists, on the line https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L318
      And the trouble starts in method createPartitionIfNotExists on line
      https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L455
      as the SessionState.get() returns null - we did not started the session yet, we try to create a new one.
      In SessionState.start() first thing done is registering the object itself as the threadlocal variable:
      https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L526

      Thereafter, the directories (scratchdir and subdirs) are tried to be created:
      https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L548
      but if this fails, the RuntimeException (from https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L619 and https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L677) is not caught in the catch blocks (nor there is any finally block).

      So basically, SessionState.start() has failed with proper initialization (e.g. HDFS dirs are not created, nor is the SessionState.hdfsSessionPath set to non-null) and yet the execution continues.
      With RuntimeException thrown from .start() method, the caller (HiveEndPoint) propagates the exception back to the HiveWriter https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L379

      The exception is caught https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L442 but handled only as do logging and go on: https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L456
      This is the moment this exception is logged:

      Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rw-------
              at org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:625)
              at org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:574)
              at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:518)
              ... 13 more
      

      What happens next? Flume re-runs the delivery, calling HiveSink.process, boiling into newConnection again. But Flume uses the SAME and exact one thread it used before to do this.
      This time, the if clause: https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L454
      returns true, as the SessionState.get() return the incorrectly initialized SessionState from previous attempt.
      Then, it goes into https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L466 and down to the
      https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L738 which fails on null value of hdfsSessionPath in SessionState.

      But this RuntimeException (NullPointerException) is not caught by https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L470 and so it is logged:

      2017-01-18 12:39:44,194 ERROR org.apache.hadoop.hive.ql.Driver: FAILED: NullPointerException Non-local session path expected to be non-null
      java.lang.NullPointerException: Non-local session path expected to be non-null
              at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
              at org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:686)
              at org.apache.hadoop.hive.ql.Context.<init>(Context.java:131)
              at org.apache.hadoop.hive.ql.Context.<init>(Context.java:118)
              at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:411)
              at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:312)
              at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1201)
              at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1296)
              at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1127)
              at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1115)
              at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.runDDL(HiveEndPoint.java:404)
              at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.createPartitionIfNotExists(HiveEndPoint.java:369)
              at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:276)
              at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243)
              at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180)
              at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157)
              at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:110)
              at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:376)
              at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:373)
              at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)
              at java.util.concurrent.FutureTask.run(FutureTask.java:262)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
              at java.lang.Thread.run(Thread.java:745)
      

      Sometimes, Flume manages to run through the https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L86 as the newConnection is created in separate thread, the Flume rushes into https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L89 creating another meaningless exception:

      2017-01-18 12:39:44,453 WARN org.apache.flume.sink.hive.HiveSink: sink_hive_1 : Failed connecting to EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', database='default', table='flume_test', partitionVals=[20170118] }
      org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', database='default', table='flume_test', partitionVals=[20170118] }
              at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:99)
              at org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:344)
              at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:296)
              at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:254)
              at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
              at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
              at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.hive.hcatalog.streaming.StreamingException: partition values=[20170118]. Unable to get path for end point: [20170118]
              at org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:162)
              at org.apache.hive.hcatalog.streaming.AbstractRecordWriter.<init>(AbstractRecordWriter.java:66)
              at org.apache.hive.hcatalog.streaming.DelimitedInputWriter.<init>(DelimitedInputWriter.java:115)
              at org.apache.flume.sink.hive.HiveDelimitedTextSerializer.createRecordWriter(HiveDelimitedTextSerializer.java:67)
              at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:89)
              ... 6 more
      Caused by: NoSuchObjectException(message:partition values=[20170118])
              at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:60283)
              at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:60251)
              at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result.read(ThriftHiveMetastore.java:60182)
              at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
              at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partition(ThriftHiveMetastore.java:1892)
              at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partition(ThriftHiveMetastore.java:1877)
              at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getPartition(HiveMetaStoreClient.java:1171)
              at org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:157)
              ... 10 more
      

      Proposing solution:
      If Hive Streaming API is allowed to be used with same thread again (which probably is), then the threadlocal set in
      https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L526
      has to be unset in case of any exception in proceeding blocks:
      https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L539
      so set the thread local back to null before rethrowing exceptions here:
      https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L568
      and here:
      https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L602

      Links to source codes are from latest version, although I have been doing testing on Hive 1.1.0. From code, it seems like
      bug has to be present also in recent versions.

      If Hive Streaming API is not allowed to be called by reusing threads, then not only Flume, but probably also NiFi client (https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveWriter.java#L237) has to be fixed (well, NiFi just copy&pasted the Flume codebase, is there any other copy of this HiveWriter out there?).

        Attachments

        1. HIVE-15658_branch-2.1_1.patch
          2 kB
          Michal Klempa
        2. HIVE-15658_branch-1.2_1.patch
          2 kB
          Michal Klempa

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                michal.klempa Michal Klempa
              • Votes:
                0 Vote for this issue
                Watchers:
                6 Start watching this issue

                Dates

                • Created:
                  Updated: