Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-11738

Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher15e85f5d-a55d-4773-8197-f0db5658f55b#1335897563]] after [10000 ms]. Sender[null] sent

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.7.2
    • Fix Version/s: None
    • Component/s: Command Line Client
    • Labels:
      None
    • Environment:

      Description

      Akka.ask.timeout 10 seconds, this miniCluster environment is written dead, can not be changed?

      ---------------------------------------------------------------------------------------------------------------------------------

      org.apache.flink.runtime.minicluster.MiniCluster

      /**

      • Creates a new Flink mini cluster based on the given configuration.
        *
      • @param miniClusterConfiguration The configuration for the mini cluster
        */
        public MiniCluster(MiniClusterConfiguration miniClusterConfiguration) { this.miniClusterConfiguration = checkNotNull(miniClusterConfiguration, "config may not be null"); this.rpcTimeout = Time.seconds(10L); this.terminationFuture = CompletableFuture.completedFuture(null); running = false; }

      ---------------------------------------------------------------------------------------------------------------------------------

       

       

       

      ---------------------------------------------------------------------------------------------------------------------------------

       

      package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc

      import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
      import org.apache.flink.streaming.api.windowing.time.Time

      /**

      • nc -lk 1234 输入数据
        */
        object SocketWindowWordCount {

      def main(args: Array[String]): Unit = {

      val port = 1234
      // get the execution environment
      val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

      // get input data by connecting to the socket
      val dataStream = env.socketTextStream("localhost", port, '\n')

      import org.apache.flink.streaming.api.scala._
      val textResult = dataStream.flatMap( w => w.split("
      s") ).map( w => WordWithCount(w,1))
      .keyBy("word")
      /**

      • 每5秒刷新一次,相当于重新开始计数,
      • 好处,不需要一直拿所有的数据统计
      • 只需要在指定时间间隔内的增量数据,减少了数据规模
        */
        .timeWindow(Time.seconds(5))
        .sum("count" )

      textResult.print().setParallelism(1)

      if(args == null || args.size ==0)

      { env.execute("默认作业") }

      else

      { env.execute(args(0)) }

      println("结束")

      }

      // Data type for words with count
      case class WordWithCount(word: String, count: Long)

      }

       

      ---------------------------------------------------------------------------------------------------------------------------------

        Attachments

        1. image-2019-02-25-13-11-13-723.png
          262 kB
          thinktothings
        2. image-2019-02-28-14-05-51-875.png
          246 kB
          thinktothings

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              thinktothings thinktothings
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: