Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20772

RocksDBValueState with TTL occurs NullPointerException when calling update(null) method

Attach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.11.2
    • Fix Version/s: None
    • Labels:
    • Environment:

      Flink version: 1.11.2

      Flink Cluster: Standalone cluster with 3 Job managers and Task managers on CentOS 7

      Description

      Problem

      • I use ValueState for my custom trigger and set TTL for these ValueState in RocksDB backend environment.
      • I found an error when I used this code. I know that ValueState.update(null) works equally to ValueState.clear() in general. Unfortunately, this error occurs after using TTL
      // My Code
      ctx.getPartitionedState(batchTotalSizeStateDesc).update(null);
      
      • I tested this in Flink 1.11.2, but I think it would be a problem in upper versions.
      • Plus, I'm a beginner. So, if there is any problem in this discussion issue, please give me advice about that. And I'll fix it! 
      // Error Stacktrace
      Caused by: TimerException{org.apache.flink.util.FlinkRuntimeException: Error while adding data to RocksDB}
      	... 12 more
      Caused by: org.apache.flink.util.FlinkRuntimeException: Error while adding data to RocksDB
      	at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
      	at org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50)
      	at <MY-CLASS>.onProcessingTime(ActionBatchTimeTrigger.java:102)
      	at <MY-CLASS>.onProcessingTime(ActionBatchTimeTrigger.java:29)
      	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onProcessingTime(WindowOperator.java:902)
      	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:498)
      	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1220)
      	... 11 more
      Caused by: java.lang.NullPointerException
      	at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:69)
      	at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:32)
      	at org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142)
      	at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
      	at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178)
      	at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167)
      	at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
      	... 18 more
      

       

      Reason

      • It relates to RocksDBValueState with TTLValueState
      • In RocksDBValueState(as well as other types of ValueState), .update(null) has to be caught in if-clauses(null checking). However, it skips the null checking and then tries to serialize the null value.
      // https://github.com/apache/flink/blob/release-1.11/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java#L96-L110
      
      @Override
      public void update(V value) { 
          if (value == null) { 
              clear(); 
              return; 
          }
       
          try { 
              backend.db.put(columnFamily, writeOptions, serializeCurrentKeyWithGroupAndNamespace(), serializeValue(value)); 
          } catch (Exception e) { 
              throw new FlinkRuntimeException("Error while adding data to RocksDB", e);      
          }
      }
      •  It is because that TtlValueState wraps the value(null) with the LastAccessTime and makes the new TtlValue Object with the null value.
      // https://github.com/apache/flink/blob/release-1.11/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java#L47-L51
      
      @Override
      public void update(T value) throws IOException { 
          accessCallback.run(); 
          original.update(wrapWithTs(value));
      }
      
      // https://github.com/apache/flink/blob/release-1.11/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java#L46-L48
      
      static <V> TtlValue<V> wrapWithTs(V value, long ts) { 
          return new TtlValue<>(value, ts);
      }
      // https://github.com/apache/flink/blob/release-1.11/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
      
      public class TtlValue<T> implements Serializable {
          private static final long serialVersionUID = 5221129704201125020L;
      
          @Nullable
          private final T userValue;
          private final long lastAccessTimestamp;
      
          public TtlValue(@Nullable T userValue, long lastAccessTimestamp) {
              this.userValue = userValue;
              this.lastAccessTimestamp = lastAccessTimestamp;
          }
      
          @Nullable
          public T getUserValue() {
              return userValue;
          }
      
          public long getLastAccessTimestamp() {
              return lastAccessTimestamp;
          }
      }
      
      • In conclusion, I think that null checking logic has to be changed for checking whether userValue variable in TtlValue is null or not

       

      I hope that it would be helpful to improve Flink and if I have a chance, I want to fix it!

      Thank you and have a happy Christmas all!

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              dorbae Seongbae Chang

              Dates

              • Created:
                Updated:

                Issue deployment