Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5801

Queryable State from Scala job/client fails with key of type Long

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Not A Bug
    • 1.2.0
    • None
    • None
    • Flink 1.2.0
      Scala 2.10

    Description

      While working on a demo Flink job, to try out Queryable State, I exposed some state of type Long -> custom class via the Query server. However, the query server returned an exception when I tried to send a query:

      Exception in thread "main" java.lang.RuntimeException: Failed to query state backend for query 0. Caused by: java.io.IOException: Unable to deserialize key and namespace. This indicates a mismatch in the key/namespace serializers used by the KvState instance and this access.
      	at org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeKeyAndNamespace(KvStateRequestSerializer.java:392)
      	at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:130)
      	at org.apache.flink.runtime.query.netty.KvStateServerHandler$AsyncKvStateQueryTask.run(KvStateServerHandler.java:220)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.io.EOFException
      	at org.apache.flink.runtime.util.DataInputDeserializer.readLong(DataInputDeserializer.java:217)
      	at org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:69)
      	at org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:27)
      	at org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeKeyAndNamespace(KvStateRequestSerializer.java:379)
      	... 7 more
      
      	at org.apache.flink.runtime.query.netty.KvStateServerHandler$AsyncKvStateQueryTask.run(KvStateServerHandler.java:257)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:745)
      

      I banged my head against this for a while, then per jgrier's suggestion I tried simply changing the key from Long to String (modifying the two keyBy calls and the keySerializer TypeHint in the attached code) and it started working perfectly.

      cc uce

      Attachments

        1. OrderFulfillment.scala
          6 kB
          Patrick Lucas
        2. OrderFulfillmentStateQuery.scala
          2 kB
          Patrick Lucas

        Activity

          People

            uce Ufuk Celebi
            plucas Patrick Lucas
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: