Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-3473

Hive can't read records written from HiveBolt

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Won't Fix
    • 2.0.0
    • None
    • storm-hive
    • None

    Description

      I'm trying to stream items from storm into hive using the HiveBolt, but Hive does not seem to see the records at all.

      Test program:

      package com.datto.hivetest;
      
      import org.apache.storm.Config;
      import org.apache.storm.StormSubmitter;
      import org.apache.storm.generated.AlreadyAliveException;
      import org.apache.storm.generated.AuthorizationException;
      import org.apache.storm.generated.InvalidTopologyException;
      import org.apache.storm.hive.bolt.HiveBolt;
      import org.apache.storm.hive.bolt.mapper.JsonRecordHiveMapper;
      import org.apache.storm.hive.common.HiveOptions;
      import org.apache.storm.spout.SpoutOutputCollector;
      import org.apache.storm.streams.StreamBuilder;
      import org.apache.storm.task.TopologyContext;
      import org.apache.storm.topology.OutputFieldsDeclarer;
      import org.apache.storm.topology.base.BaseRichSpout;
      import org.apache.storm.tuple.Fields;
      import org.apache.storm.tuple.Values;
      import org.apache.storm.utils.Time;
      
      import java.util.Map;
      import java.util.Random;
      
      public class MainStorm {
      	public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
      		HiveOptions hiveOptions = new HiveOptions(
      			"<url>",
      			"default",
      			"test_table",
      			new JsonRecordHiveMapper()
      				.withColumnFields(new Fields("value"))
      		)
      			.withAutoCreatePartitions(true);
      
      		StreamBuilder builder = new StreamBuilder();
      		builder.newStream(new TestSpout())
      			.map(tup -> tup.getStringByField("word").toLowerCase())
      			.to(new HiveBolt(hiveOptions));
      
      		Config config = new Config();
      		config.setMessageTimeoutSecs(30);
      		config.setMaxSpoutPending(1024);
      		config.setClasspath("/etc/hadoop/conf/");
      
      		StormSubmitter.submitTopology("hive-test", config, builder.build());
      	}
      
      	public static class TestSpout extends BaseRichSpout {
      		private transient SpoutOutputCollector out;
      		private transient Random random;
      
      		@Override
      		public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
      			out = collector;
      			random = new Random();
      		}
      
      		@Override
      		public void nextTuple() {
      			try {
      				Time.sleep(100);
      			} catch (InterruptedException e) {
      				Thread.currentThread().interrupt();
      				throw new RuntimeException(e);
      			}
      
      			final String[] words = new String[]{ "nathan", "mike", "jackson", "golda", "bertels" };
      			final String word = words[random.nextInt(words.length)];
      			out.emit(new Values(word));
      		}
      
      		@Override
      		public void declareOutputFields(OutputFieldsDeclarer declarer) {
      			declarer.declare(new Fields("word"));
      		}
      	}
      }
      

      Table creation:

      CREATE TABLE test_table (value string) CLUSTERED BY (value) INTO 4 BUCKETS STORED AS ORC TBLPROPERTIES('orc.compress' = 'ZLIB', 'transactional' = 'true');
      
      GRANT ALL ON test_table TO USER storm;

      Setting the ACL:

      sudo -u hdfs hdfs dfs -setfacl -m user:storm:rwx /warehouse/tablespace/managed/hive/test_table
      sudo -u hdfs hdfs dfs -setfacl -m default:user:storm:rwx /warehouse/tablespace/managed/hive/test_table
      

      Hive results after running for around 10 minutes:

      > SELECT COUNT(*) FROM test_table;
      INFO  : Compiling command(queryId=hive_20190722195152_2315b4c9-f527-4b6e-8652-151d9c4f6403): SELECT COUNT(*) FROM test_table
      INFO  : Semantic Analysis Completed (retrial = false)
      INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:_c0, type:bigint, comment:null)], properties:null)
      INFO  : Completed compiling command(queryId=hive_20190722195152_2315b4c9-f527-4b6e-8652-151d9c4f6403); Time taken: 1.138 seconds
      INFO  : Executing command(queryId=hive_20190722195152_2315b4c9-f527-4b6e-8652-151d9c4f6403): SELECT COUNT(*) FROM test_table
      INFO  : Completed executing command(queryId=hive_20190722195152_2315b4c9-f527-4b6e-8652-151d9c4f6403); Time taken: 0.013 seconds
      INFO  : OK
      +------+
      | _c0  |
      +------+
      | 0    |
      +------+
      

      So hive thinks there are no results, which isn't good. But if I look at hdfs, there are some files there:

      # sudo -u hdfs hdfs dfs -ls -R -h /warehouse/tablespace/managed/hive/test_table
      drwxrwx---+  - storm hadoop          0 2019-07-22 19:15 /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100
      -rw-rw----+  3 storm hadoop          1 2019-07-22 19:15 /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/_orc_acid_version
      -rw-rw----+  3 storm hadoop     74.4 K 2019-07-22 19:27 /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00001
      -rw-rw----+  3 storm hadoop        376 2019-07-22 19:27 /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00001_flush_length
      -rw-rw----+  3 storm hadoop     73.4 K 2019-07-22 19:27 /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00002
      -rw-rw----+  3 storm hadoop        376 2019-07-22 19:27 /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00002_flush_length
      -rw-rw----+  3 storm hadoop     84.9 K 2019-07-22 19:27 /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00003
      -rw-rw----+  3 storm hadoop        376 2019-07-22 19:27 /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00003_flush_length
      

      And they seem to have valid rows:

      ❯❯❯ ./orc-contents /tmp/bucket_00002  | head
      {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 0, "currentTransaction": 1, "row": {"value": "bertels"}}
      {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 1, "currentTransaction": 1, "row": {"value": "bertels"}}
      {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 2, "currentTransaction": 1, "row": {"value": "bertels"}}
      {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 3, "currentTransaction": 1, "row": {"value": "bertels"}}
      {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 4, "currentTransaction": 1, "row": {"value": "bertels"}}
      {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 5, "currentTransaction": 1, "row": {"value": "bertels"}}
      {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 6, "currentTransaction": 1, "row": {"value": "bertels"}}
      {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 7, "currentTransaction": 1, "row": {"value": "bertels"}}
      {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 8, "currentTransaction": 1, "row": {"value": "bertels"}}
      {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 9, "currentTransaction": 1, "row": {"value": "bertels"}}
      

      I can insert into the table manually, and I've also written a test java program that uses the hive streaming API to write one row, and hive sees those inserts. I don't see any errors in the storm logs; the tuples seem to be flushed and acked ok. I don't think I've seen any errors in the metastore logs either.

      Anyone know what's up? I can get more info if needed.

      Attachments

        Activity

          People

            Unassigned Unassigned
            datto_aparrill Alex Parrill
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: