Details
-
Bug
-
Status: Open
-
P3
-
Resolution: Unresolved
-
None
-
None
-
None
Description
I user beam in spark cluster,The application is blow.
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
options.setEnableSparkMetricSinks(false);
options.setStreaming(true);
options.setSparkMaster("spark://10.100.124.205:6066");
options.setAppName("Beam App Spark"+new Random().nextFloat());
options.setJobName("Beam Job Spark"+new Random().nextFloat());
System.out.println("App Name:"+options.getAppName());
System.out.println("Job Name:"+options.getJobName());
options.setMaxRecordsPerBatch(100000L);
// PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
// Duration size = Duration.standardMinutes(4);
long duration = 60;
if(args!=null && args.length==1)
Duration size = Duration.standardSeconds(duration);
System.out.println("时间窗口为:["+duration+"]秒");
Window.Bound<KV<String,String>> fixWindow = Window.<KV<String,String>> into(
FixedWindows.of(size)
);
String kafkaAddress = "10.100.124.208:9093";
// String kafkaAddress = "192.168.100.212:9092";
Map<String, Object> kfConsunmerConf = new HashMap<String, Object>();
kfConsunmerConf.put("auto.offset.reset", "latest");
PCollection<String> kafkaJsonPc = p.apply(KafkaIO.<String, String> read()
.withBootstrapServers(kafkaAddress)
.withTopics(ImmutableList.of("wypxx1"))
.withKeyCoder(StringUtf8Coder.of())
.withValueCoder(StringUtf8Coder.of())
.updateConsumerProperties(kfConsunmerConf)
.withoutMetadata()
).apply(Values.<String> create());
PCollection<KV<String,String>> totalPc = kafkaJsonPc.apply(
"count line",
ParDo.of(new DoFn<String,KV<String,String>>() {
@ProcessElement
public void processElement(ProcessContext c)
})
);
PCollection<KV<String, Iterable<String>>> itPc = totalPc.apply(fixWindow).apply(
"group by appKey",
GroupByKey.<String, String>create()
);
itPc.apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, Iterable<String>> keyIt = c.element();
String key = keyIt.getKey();
Iterable<String> itb = keyIt.getValue();
Iterator<String> it = itb.iterator();
StringBuilder sb = new StringBuilder();
sb.append(key).append(":[");
while(it.hasNext())
String str = sb.toString();
str = str.substring(0,str.length() -1) + "]";
System.out.println(str);
String filePath = "/data/wyp/sparktest.txt";
String line = "word->["+key+"]total count="str"-->time+"+c.timestamp().toString();
System.out.println("writefile----->"+line);
FileUtil.write(filePath, line, true, true);
}
}));
p.run().waitUntilFinish();
When I user submit application to spark cluster.In spark UI,I can see log of totalPc PCollection of. after one miniter but I can.t see log of itPc PCollection.
I use local mode spark,It work well.
Please help me to resovle this proplems.Thanks!