Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-1789

can't use window in spark cluster module

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • None
    • None
    • runner-spark
    • None

    Description

      I user beam in spark cluster,The application is blow.
      SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
      options.setRunner(SparkRunner.class);
      options.setEnableSparkMetricSinks(false);
      options.setStreaming(true);
      options.setSparkMaster("spark://10.100.124.205:6066");
      options.setAppName("Beam App Spark"+new Random().nextFloat());
      options.setJobName("Beam Job Spark"+new Random().nextFloat());
      System.out.println("App Name:"+options.getAppName());
      System.out.println("Job Name:"+options.getJobName());
      options.setMaxRecordsPerBatch(100000L);

      // PipelineOptions options = PipelineOptionsFactory.create();
      Pipeline p = Pipeline.create(options);

      // Duration size = Duration.standardMinutes(4);
      long duration = 60;
      if(args!=null && args.length==1)

      { duration = Integer.valueOf(args[0]); }

      Duration size = Duration.standardSeconds(duration);
      System.out.println("时间窗口为:["+duration+"]秒");
      Window.Bound<KV<String,String>> fixWindow = Window.<KV<String,String>> into(
      FixedWindows.of(size)
      );

      String kafkaAddress = "10.100.124.208:9093";
      // String kafkaAddress = "192.168.100.212:9092";

      Map<String, Object> kfConsunmerConf = new HashMap<String, Object>();
      kfConsunmerConf.put("auto.offset.reset", "latest");
      PCollection<String> kafkaJsonPc = p.apply(KafkaIO.<String, String> read()
      .withBootstrapServers(kafkaAddress)
      .withTopics(ImmutableList.of("wypxx1"))
      .withKeyCoder(StringUtf8Coder.of())
      .withValueCoder(StringUtf8Coder.of())
      .updateConsumerProperties(kfConsunmerConf)
      .withoutMetadata()
      ).apply(Values.<String> create());

      PCollection<KV<String,String>> totalPc = kafkaJsonPc.apply(
      "count line",
      ParDo.of(new DoFn<String,KV<String,String>>() {
      @ProcessElement
      public void processElement(ProcessContext c)

      { String line = c.element(); Instant is = c.timestamp(); if(line.length()>2) line = line.substring(0,2); System.out.println(line + " " + is.toString()); c.output(KV.of(line, line)); }

      })
      );

      PCollection<KV<String, Iterable<String>>> itPc = totalPc.apply(fixWindow).apply(
      "group by appKey",
      GroupByKey.<String, String>create()
      );
      itPc.apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, Void>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
      KV<String, Iterable<String>> keyIt = c.element();
      String key = keyIt.getKey();
      Iterable<String> itb = keyIt.getValue();
      Iterator<String> it = itb.iterator();
      StringBuilder sb = new StringBuilder();
      sb.append(key).append(":[");
      while(it.hasNext())

      { sb.append(it.next()).append(","); }

      String str = sb.toString();

      str = str.substring(0,str.length() -1) + "]";
      System.out.println(str);
      String filePath = "/data/wyp/sparktest.txt";
      String line = "word->["+key+"]total count="str"-->time+"+c.timestamp().toString();
      System.out.println("writefile----->"+line);
      FileUtil.write(filePath, line, true, true);
      }

      }));

      p.run().waitUntilFinish();

      When I user submit application to spark cluster.In spark UI,I can see log of totalPc PCollection of. after one miniter but I can.t see log of itPc PCollection.
      I use local mode spark,It work well.
      Please help me to resovle this proplems.Thanks!

      Attachments

        Activity

          People

            Unassigned Unassigned
            tianyou tianyou
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: