Details
Description
I want append the data in a file , when i use SequenceFile.appendIfExists , it throw NullPointerException at at org.apache.hadoop.io.SequenceFile$Writer.(SequenceFile.java:1119)
when i remove the 'appendIfExists', it works, but it will cover old file.
when i try use CompressionType.RECORD or CompressionType.BLOCK throw "not support" exception
// my code SequenceFile.Writer writer = null; writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(path), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class), SequenceFile.Writer.appendIfExists(true) );
// all my code public class Writer1 implements VoidFunction<Iterator<Tuple2<String, String>>> { private static Configuration conf = new Configuration(); private int MAX_LINE = 3; // little num,for test @Override public void call(Iterator<Tuple2<String, String>> iterator) throws Exception { int partitionId = TaskContext.get().partitionId(); int count = 0; SequenceFile.Writer writer = null; while (iterator.hasNext()) { Tuple2<String, String> tp = iterator.next(); Path path = new Path("D:/tmp-doc/logs/logs.txt"); if (writer == null) writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(path), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class), SequenceFile.Writer.appendIfExists(true) ); writer.append(new Text(tp._1), new Text(tp._2)); count++; if (count > MAX_LINE) { IOUtils.closeStream(writer); count = 0; writer = SequenceFile.createWriter(... // same as above } } if (count > 0) { IOUtils.closeStream(writer); } IOUtils.closeStream(writer); } }
// above code call by below
import com.xxx.algo.hadoop.Writer1 import com.xxx.algo.utils.Utils import kafka.serializer.StringDecoder import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Durations, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object KafkaSparkStreamingApp { def main(args: Array[String]): Unit = { val kafka = "192.168.30.4:9092,192.168.30.5:9092,192.168.30.6:9092" val zk = "192.168.30.4:2181,192.168.30.5:2181,192.168.30.6:2181" val topics = Set("test.aries.collection.appevent.biz") val tag = "biz" val durationSeconds = 5000 val conf = new SparkConf() conf.setAppName("user-log-consumer") .set("spark.serilizer","org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrationRequired", "true") .set("spark.defalut.parallelism","2") .set("spark.rdd.compress","true") .setMaster("local[2]") val sc = new SparkContext(conf) val session = SparkSession.builder() .config(conf) .getOrCreate() val ssc = new StreamingContext(sc, Durations.milliseconds(durationSeconds)) val kafkaParams = Map[String, String]( "metadata.broker.list" -> kafka, "bootstrap.servers" -> kafka, "zookeeper.connect" -> zk, "group.id" -> "recommend_stream_spark", "key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer", "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer" ) val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topics ) val timeFieldName = "log_time" stream.foreachRDD(rddMsg => { rddMsg.map(msg => { val value = msg._2 val time = Utils.getTime(value, timeFieldName) new Tuple2(time + "," + tag, value) }) .toJavaRDD().foreachPartition(new Writer1()) // here }) ssc.start() ssc.awaitTermination() } }
more info see:https://stackoverflow.com/questions/53943978/hadoop-sequencefile-createwriter-appendifexists-codec-cause-nullpointerexception
Attachments
Attachments
Issue Links
- duplicates
-
HADOOP-13138 Unable to append to a SequenceFile with Compression.NONE.
- Resolved