Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5184

Error result of compareSerialized in RowComparator class

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      RowSerializer will write null mask for all fields in a row before serialize row data to DataOutputView.

      RowSerializer.scala
      override def serialize(value: Row, target: DataOutputView) {
          val len = fieldSerializers.length
      
          if (value.productArity != len) {
            throw new RuntimeException("Row arity of value does not match serializers.")
          }
      
          // write a null mask
          writeNullMask(len, value, target)
      
      ......
      }
      
      

      RowComparator will deserialize a row data from DataInputView when call compareSerialized method. However, the first parameter value of readIntoNullMask method is wrong, which should be the count of all fields, rather than the length of serializers (to deserialize the first n fields for comparison).

      RowComparator.scala
      override def compareSerialized(firstSource: DataInputView, secondSource: DataInputView): Int = {
          val len = serializers.length
          val keyLen = keyPositions.length
      
          readIntoNullMask(len, firstSource, nullMask1)
          readIntoNullMask(len, secondSource, nullMask2)
      ......
      }
      

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            godfreyhe godfrey he
            godfreyhe godfrey he
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment