Uploaded image for project: 'Hadoop Common'
  1. Hadoop Common
  2. HADOOP-16021

SequenceFile.createWriter appendIfExists codec cause NullPointerException

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Duplicate
    • 2.7.3
    • None
    • common
    • windows10 or Linux-centos , hadoop2.7.3, jdk8

    Description

       
      I want append the data in a file , when i use SequenceFile.appendIfExists , it throw NullPointerException at at org.apache.hadoop.io.SequenceFile$Writer.(SequenceFile.java:1119)

      when i remove the 'appendIfExists', it works, but it will cover old file.

       

      when i try use CompressionType.RECORD or CompressionType.BLOCK throw "not support" exception

       

      // my code
      SequenceFile.Writer writer = null; 
      
      writer = SequenceFile.createWriter(conf, 
          SequenceFile.Writer.file(path), 
          SequenceFile.Writer.keyClass(Text.class), 
          SequenceFile.Writer.valueClass(Text.class), 
          SequenceFile.Writer.appendIfExists(true) );
      

       

      // all my code
      public class Writer1 implements VoidFunction<Iterator<Tuple2<String, String>>> {
          private static Configuration conf = new Configuration();
          private int MAX_LINE = 3; // little num,for test
      
          @Override
          public void call(Iterator<Tuple2<String, String>> iterator) throws Exception {
              int partitionId = TaskContext.get().partitionId();
              int count = 0;
              SequenceFile.Writer writer = null;
              while (iterator.hasNext()) {
      
                  Tuple2<String, String> tp = iterator.next();
                  Path path = new Path("D:/tmp-doc/logs/logs.txt");
      
                  if (writer == null)
                      writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(path),
                              SequenceFile.Writer.keyClass(Text.class),
                              SequenceFile.Writer.valueClass(Text.class),
                              SequenceFile.Writer.appendIfExists(true)
                              );
      
                  writer.append(new Text(tp._1), new Text(tp._2));
                  count++;
      
                  if (count > MAX_LINE) {
                      IOUtils.closeStream(writer);
                      count = 0;
                      writer = SequenceFile.createWriter(... // same as above
                  }
              }
              if (count > 0) {
                  IOUtils.closeStream(writer);
              }
              IOUtils.closeStream(writer);
          }
      }
      

       // above code call by below

      import com.xxx.algo.hadoop.Writer1
      import com.xxx.algo.utils.Utils
      import kafka.serializer.StringDecoder
      import org.apache.spark.sql.SparkSession
      import org.apache.spark.streaming.kafka.KafkaUtils
      import org.apache.spark.streaming.{Durations, StreamingContext}
      import org.apache.spark.{SparkConf, SparkContext}
      
      
      object KafkaSparkStreamingApp {
        def main(args: Array[String]): Unit = {
          val kafka = "192.168.30.4:9092,192.168.30.5:9092,192.168.30.6:9092"
          val zk = "192.168.30.4:2181,192.168.30.5:2181,192.168.30.6:2181"
          val topics = Set("test.aries.collection.appevent.biz")
          val tag = "biz"
          val durationSeconds = 5000
          val conf = new SparkConf()
          conf.setAppName("user-log-consumer")
            .set("spark.serilizer","org.apache.spark.serializer.KryoSerializer")
            .set("spark.kryo.registrationRequired", "true")
            .set("spark.defalut.parallelism","2")
            .set("spark.rdd.compress","true")
            .setMaster("local[2]")
          val sc = new SparkContext(conf)
          val session = SparkSession.builder()
            .config(conf)
            .getOrCreate()
          val ssc = new StreamingContext(sc, Durations.milliseconds(durationSeconds))
          val kafkaParams = Map[String, String](
            "metadata.broker.list" -> kafka,
            "bootstrap.servers" -> kafka,
            "zookeeper.connect" -> zk,
            "group.id" -> "recommend_stream_spark",
            "key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
            "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
            "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
          )
          val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
            ssc,
            kafkaParams,
            topics
          )
          val timeFieldName = "log_time"
          stream.foreachRDD(rddMsg => {
            rddMsg.map(msg => {
              val value = msg._2
              val time = Utils.getTime(value, timeFieldName)
              new Tuple2(time + "," + tag, value)
            })
              .toJavaRDD().foreachPartition(new Writer1()) // here
          })
          ssc.start()
          ssc.awaitTermination()
        }
      }
      

      more info see:https://stackoverflow.com/questions/53943978/hadoop-sequencefile-createwriter-appendifexists-codec-cause-nullpointerexception

      Attachments

        1. CompressionType.NONE-NullPointerException-error log.txt
          18 kB
          asin
        2. CompressionType.BLOCK-Not supported-error log.txt
          19 kB
          asin
        3. 62.png
          556 kB
          asin
        4. 055.png
          303 kB
          asin

        Issue Links

          Activity

            People

              Unassigned Unassigned
              xinkenny asin
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: