Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10915

clojure context.collectWithTimestamp Will be blocked.

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Minor
    • Resolution: Won't Fix
    • Affects Version/s: 1.6.2
    • Fix Version/s: None
    • Component/s: Command Line Client
    • Labels:
      None

      Description

      (deftype Cflatmapfunction [] FlatMapFunction
        (flatMap [this value collector]
          (log/info "value:" (type value) value )
          (let [tomap (into {} value)
                {:keys [shopid  shopname]} (ym/readstring (get tomap "body"))]
            ;(.collect collector (Tuple5. msgid "orgidid" "toporgid" "(ym/readstring body)" 1))
            (.collect collector (Tuple3. shopid shopname (int 1)))
            )
          ))
      
      ;;;The problem is here... Clojure realizes that FlatMapFunction will block in clusters.  but local jvm run is ok ..
      
      
      (defn -main [& args]
        (let [flink-env (StreamExecutionEnvironment/getExecutionEnvironment)
              _ (.enableCheckpointing flink-env 13000)
              sources (.addSource flink-env
                                  (RocketMQSource. (SimpleKeyValueDeserializationSchema. "msgid" "body") 
                                                   (gen-consumer-properties)))
              _ (.name sources "ririri")
              _ (.setParallelism sources 1)
              ednds (.returns (.flatMap sources (Cflatmapfunction.)) CloudTuple3)
              _ (.name ednds "ccc")
              _ (.setParallelism ednds 1)
              _   (.print ednds)
              ;counts (.sum (.timeWindow(.keyBy ednds (int-array [0])) (Time/seconds 10))  2)
              ;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) (Time/minutes 2))  2)
              ]
          (prn "开始有状态的流式计算1" flink-env)
          ;(.setParallelism ds 1)
          ;(.setParallelism ednds 1)
          ;(.print counts)
          ;(.print secondcounts)
          (.execute flink-env"rocketmq-flink-feng2")
          )
        )
      
      

       

        Attachments

          Activity

            People

            • Assignee:
              twalthr Timo Walther
              Reporter:
              xingzhe fengge
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: