Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5756

When there are many values under the same key in ListState, RocksDBStateBackend performances poor

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.2.0
    • Fix Version/s: 1.3.0, 1.2.1
    • Labels:
      None
    • Environment:

      CentOS 7.2

      Description

      When using RocksDB as the StateBackend, if there are many values under the same key in ListState, the windowState.get() operator performances very poor. I also the the RocksDB using version 4.11.2, the performance is also very poor. The problem is likely to related to RocksDB itself's get() operator after using merge(). The problem may influences the window operation's performance when the size is very large using ListState. I try to merge 50000 values under the same key in RocksDB, It costs 120 seconds to execute get() operation.

      ///////////////////////////////////////////////////////////////////////////////
      The flink's code is as follows:

      class SEventSource extends RichSourceFunction [SEvent] {
      
        private var count = 0L
      
        private val alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"
        override def run(sourceContext: SourceContext[SEvent]): Unit = {
          while (true) {
            for (i <- 0 until 5000) {
              sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
              count += 1L
            }
            Thread.sleep(1000)
          }
        }
      }
      
      env.addSource(new SEventSource)
            .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] {
              override def getCurrentWatermark: Watermark = {
                new Watermark(System.currentTimeMillis())
              }
      
              override def extractTimestamp(t: SEvent, l: Long): Long = {
                System.currentTimeMillis()
              }
            })
            .keyBy(0)
            .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2)))
            .apply(new WindowStatistic)
            .map(x => (System.currentTimeMillis(), x))
            .print()
      

      ////////////////////////////////////
      The RocksDB Test code:

      val stringAppendOperator = new StringAppendOperator
          val options = new Options()
          options.setCompactionStyle(CompactionStyle.LEVEL)
            .setCompressionType(CompressionType.SNAPPY_COMPRESSION)
            .setLevelCompactionDynamicLevelBytes(true)
            .setIncreaseParallelism(4)
            .setUseFsync(true)
            .setMaxOpenFiles(-1)
            .setCreateIfMissing(true)
            .setMergeOperator(stringAppendOperator)
      
          val write_options = new WriteOptions
          write_options.setSync(false)
      
          val rocksDB = RocksDB.open(options, "/******/Data/")
          val key = "key"
          val value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"
      
          val beginmerge = System.currentTimeMillis()
          for(i <- 0 to 50000) {
            rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes())
            //rocksDB.put(key.getBytes, value.getBytes)
          }
          println("finish")
      
          val begin = System.currentTimeMillis()
          rocksDB.get(key.getBytes)
          val end = System.currentTimeMillis()
      
          println("merge cost:" + (begin - beginmerge))
          println("Time consuming:" + (end - begin))
        }
      }
      

        Issue Links

          Activity

          Hide
          shijinkui shijinkui added a comment -

          hi, Stephan Ewen
          Do we have some tuning technique about this problem originated RocksDB get()?

          Show
          shijinkui shijinkui added a comment - hi, Stephan Ewen Do we have some tuning technique about this problem originated RocksDB get()?
          Hide
          StephanEwen Stephan Ewen added a comment - - edited

          Thanks for opening this and sharing the test results.
          I agree that the performance of RocksDB is not optimal and that we would like to get better performance out of the state backend. In general, RocksDB is heavily optimized for writes and for small values. Larger values (as you get with the merge) perform very bad.

          Here are a few things we can do and have already started doing:

          Improve the other state backends

          • We are currently making the in-memory state backend (object data) much stronger, with async snapshots (see FLINK-5715 )
          • It makes sense to eventually build an own state backend that operators on serialized data with managed memory

          Optimize the RocksDB State Backend

          • We can try an avoid RocksDB's merge operation and instead use range iterators for ListState.
          • Quick benchmark of the same task in that approach gives 91ms insert time and 35ms get() time. That looks like worth exploring.

          Code for range-iterator mini-benchmark

          		final File rocksDir = new File("/tmp/rdb");
          		FileUtils.deleteDirectory(rocksDir);
          
          		final Options options = new Options()
          				.setCompactionStyle(CompactionStyle.LEVEL)
          				.setLevelCompactionDynamicLevelBytes(true)
          				.setIncreaseParallelism(4)
          				.setUseFsync(false)
          				.setMaxOpenFiles(-1)
          				.setAllowOsBuffer(true)
          				.setDisableDataSync(true)
          				.setCreateIfMissing(true)
          				.setMergeOperator(new StringAppendOperator());
          
          		final WriteOptions write_options = new WriteOptions()
          				.setSync(false)
          				.setDisableWAL(true);
          
          		final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath());
          
          		final String key = "key";
          		final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
          
          		final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
          		final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
          
          		final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4);
          
          		final Unsafe unsafe = MemoryUtils.UNSAFE;
          		final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
          
          		final int num = 50000;
          		System.out.println("begin insert");
          
          		final long beginInsert = System.nanoTime();
          		for (int i = 0; i < num; i++) {
          			unsafe.putInt(keyTemplate, offset, i);
          			rocksDB.put(write_options, keyTemplate, valueBytes);
          		}
          		final long endInsert = System.nanoTime();
          		System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
          
          		final byte[] resultHolder = new byte[num * valueBytes.length];
          
          		final long beginGet = System.nanoTime();
          
          		final RocksIterator iterator = rocksDB.newIterator();
          		int pos = 0;
          
          		// seek to start
          		unsafe.putInt(keyTemplate, offset, 0);
          		iterator.seek(keyTemplate);
          
          		// mark end
          		unsafe.putInt(keyTemplate, offset, -1);
          
          		// iterate
          		while (iterator.isValid()) {
          			byte[] currKey = iterator.key();
          			if (sameKey(keyBytes, currKey)) {
          				byte[] currValue = iterator.value();
          				System.arraycopy(currValue, 0, resultHolder, pos, currValue.length);
          				pos += currValue.length;
          				iterator.next();
          			}
          			else {
          				break;
          			}
          		}
          		
          		final long endGet = System.nanoTime();
          
          		System.out.println("end get - duration: " + ((endGet - beginGet) / 1_000_000) + " ms");
          
          Show
          StephanEwen Stephan Ewen added a comment - - edited Thanks for opening this and sharing the test results. I agree that the performance of RocksDB is not optimal and that we would like to get better performance out of the state backend. In general, RocksDB is heavily optimized for writes and for small values. Larger values (as you get with the merge) perform very bad. Here are a few things we can do and have already started doing: Improve the other state backends We are currently making the in-memory state backend (object data) much stronger, with async snapshots (see FLINK-5715 ) It makes sense to eventually build an own state backend that operators on serialized data with managed memory Optimize the RocksDB State Backend We can try an avoid RocksDB's merge operation and instead use range iterators for ListState. Quick benchmark of the same task in that approach gives 91ms insert time and 35ms get() time. That looks like worth exploring. Code for range-iterator mini-benchmark final File rocksDir = new File( "/tmp/rdb" ); FileUtils.deleteDirectory(rocksDir); final Options options = new Options() .setCompactionStyle(CompactionStyle.LEVEL) .setLevelCompactionDynamicLevelBytes( true ) .setIncreaseParallelism(4) .setUseFsync( false ) .setMaxOpenFiles(-1) .setAllowOsBuffer( true ) .setDisableDataSync( true ) .setCreateIfMissing( true ) .setMergeOperator( new StringAppendOperator()); final WriteOptions write_options = new WriteOptions() .setSync( false ) .setDisableWAL( true ); final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath()); final String key = "key" ; final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321" ; final byte [] keyBytes = key.getBytes(StandardCharsets.UTF_8); final byte [] valueBytes = value.getBytes(StandardCharsets.UTF_8); final byte [] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4); final Unsafe unsafe = MemoryUtils.UNSAFE; final long offset = unsafe.arrayBaseOffset( byte [].class) + keyTemplate.length - 4; final int num = 50000; System .out.println( "begin insert" ); final long beginInsert = System .nanoTime(); for ( int i = 0; i < num; i++) { unsafe.putInt(keyTemplate, offset, i); rocksDB.put(write_options, keyTemplate, valueBytes); } final long endInsert = System .nanoTime(); System .out.println( "end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms" ); final byte [] resultHolder = new byte [num * valueBytes.length]; final long beginGet = System .nanoTime(); final RocksIterator iterator = rocksDB.newIterator(); int pos = 0; // seek to start unsafe.putInt(keyTemplate, offset, 0); iterator.seek(keyTemplate); // mark end unsafe.putInt(keyTemplate, offset, -1); // iterate while (iterator.isValid()) { byte [] currKey = iterator.key(); if (sameKey(keyBytes, currKey)) { byte [] currValue = iterator.value(); System .arraycopy(currValue, 0, resultHolder, pos, currValue.length); pos += currValue.length; iterator.next(); } else { break ; } } final long endGet = System .nanoTime(); System .out.println( "end get - duration: " + ((endGet - beginGet) / 1_000_000) + " ms" );
          Hide
          shijinkui shijinkui added a comment -

          Stephan Ewen Thank for your reply. Syinchwun Leo Test the mini-benchmark please.
          FLINK-5715 is nice.

          Show
          shijinkui shijinkui added a comment - Stephan Ewen Thank for your reply. Syinchwun Leo Test the mini-benchmark please. FLINK-5715 is nice.
          Hide
          StephanEwen Stephan Ewen added a comment -

          I raised the issue at RocksDB: https://github.com/facebook/rocksdb/issues/1988

          Show
          StephanEwen Stephan Ewen added a comment - I raised the issue at RocksDB: https://github.com/facebook/rocksdb/issues/1988
          Hide
          StephanEwen Stephan Ewen added a comment -

          I would suggest to see if we get a response from the RocksDB community. If we cannot expect a fix soon, we will have to build around that using the "range iterator" workaround described above.

          Show
          StephanEwen Stephan Ewen added a comment - I would suggest to see if we get a response from the RocksDB community. If we cannot expect a fix soon, we will have to build around that using the "range iterator" workaround described above.
          Hide
          SyinchwunLeo Syinchwun Leo added a comment -

          OK, this problem is not only influence the performance of UDF windows but also the checkpoint. Poor window performance leads to many tuples waiting for being processed in IO buffer and the barrier could not be processed timely. This may result in failure of checkpoints.

          Show
          SyinchwunLeo Syinchwun Leo added a comment - OK, this problem is not only influence the performance of UDF windows but also the checkpoint. Poor window performance leads to many tuples waiting for being processed in IO buffer and the barrier could not be processed timely. This may result in failure of checkpoints.
          Hide
          SyinchwunLeo Syinchwun Leo added a comment - - edited

          Is it possible that avoiding using merge() operation. I notice that the result of RocksDB's get() is a byte array. My point is that when calling add() method of RocksDBListState, call get() first and get byte array, then append new value's serialized byte[] to byte array, then set back to Rocks. The method make it is possible there is only one byte[] under the key. I haven't
          test the idea, maybe the performance is not perfect and awkward.

          Show
          SyinchwunLeo Syinchwun Leo added a comment - - edited Is it possible that avoiding using merge() operation. I notice that the result of RocksDB's get() is a byte array. My point is that when calling add() method of RocksDBListState, call get() first and get byte array, then append new value's serialized byte[] to byte array, then set back to Rocks. The method make it is possible there is only one byte[] under the key. I haven't test the idea, maybe the performance is not perfect and awkward.
          Hide
          wenlong.lwl Wenlong Lyu added a comment -

          In RocksDB , the merge operation is processed in both compaction and get but not in merge. When merging two Slices by a StringAppendOperator, you will need to create a new string, which can be time costly when there are thousands of slice to merge.

          I think that is why it is slow to get the value after you added five thousand of items to List. If you call rocksdb.compactRange() before get, it will be quite quickly.

          In really application scenario, the compaction happens more often than what is in the test, and the performance will be much better in real environment except for in the extreme test scenario.

          Show
          wenlong.lwl Wenlong Lyu added a comment - In RocksDB , the merge operation is processed in both compaction and get but not in merge. When merging two Slices by a StringAppendOperator, you will need to create a new string, which can be time costly when there are thousands of slice to merge. I think that is why it is slow to get the value after you added five thousand of items to List. If you call rocksdb.compactRange() before get, it will be quite quickly. In really application scenario, the compaction happens more often than what is in the test, and the performance will be much better in real environment except for in the extreme test scenario.
          Hide
          StephanEwen Stephan Ewen added a comment -

          Syinchwun Leo I tried the variant of manual get / concatenate / put roundtrips. The performance was even worse than in the case above.

          Wenlong Lyu It is true, in many cases the periodic compactions will help, but we cannot guarantee that. Manually triggering compactRange() is an interesting idea. To you have experiences on when/how to best do that?

          Show
          StephanEwen Stephan Ewen added a comment - Syinchwun Leo I tried the variant of manual get / concatenate / put roundtrips. The performance was even worse than in the case above. Wenlong Lyu It is true, in many cases the periodic compactions will help, but we cannot guarantee that. Manually triggering compactRange() is an interesting idea. To you have experiences on when/how to best do that?
          Hide
          StephanEwen Stephan Ewen added a comment -

          Just validated that compactions actually help, but compactions are equally slow when many values are merged.

          Its also the case that multiple gets to the same key take long, not only the first get.

          Show
          StephanEwen Stephan Ewen added a comment - Just validated that compactions actually help, but compactions are equally slow when many values are merged. Its also the case that multiple gets to the same key take long, not only the first get.
          Hide
          StephanEwen Stephan Ewen added a comment -

          We have found an approach in RocksDB that may increase the performance here. Need to verify, will then report back...

          Show
          StephanEwen Stephan Ewen added a comment - We have found an approach in RocksDB that may increase the performance here. Need to verify, will then report back...
          Hide
          StephanEwen Stephan Ewen added a comment -

          Added a mini-benchmark for 'list operations' on top of RocksDB in 677b508a962c5c7df9308ac3531e799cddec27f6

          Show
          StephanEwen Stephan Ewen added a comment - Added a mini-benchmark for 'list operations' on top of RocksDB in 677b508a962c5c7df9308ac3531e799cddec27f6
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user rmetzger opened a pull request:

          https://github.com/apache/flink/pull/3704

          FLINK-5756 Replace RocksDB dependency with FRocksDB

          @StefanRRichter has created a custom RocksDB release that fixes FLINK-5756.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/rmetzger/flink flink5756

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3704.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3704


          commit b05c595a37ea5b3a08ef4a11d9259eb7aabee005
          Author: Robert Metzger <rmetzger@apache.org>
          Date: 2017-04-09T20:05:08Z

          FLINK-5756 Replace RocksDB dependency with FRocksDB


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/3704 FLINK-5756 Replace RocksDB dependency with FRocksDB @StefanRRichter has created a custom RocksDB release that fixes FLINK-5756 . You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink flink5756 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3704.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3704 commit b05c595a37ea5b3a08ef4a11d9259eb7aabee005 Author: Robert Metzger <rmetzger@apache.org> Date: 2017-04-09T20:05:08Z FLINK-5756 Replace RocksDB dependency with FRocksDB
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3704

          LGTM. +1 for merging this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3704 LGTM. +1 for merging this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user SyinChwunLeo commented on the issue:

          https://github.com/apache/flink/pull/3704

          I notice that RocksDB in 4.11.2 version is dependent on zlib-1.2.8, unfortunately, there is a security leak in zlib-1.2.8. https://bugzilla.redhat.com/show_bug.cgi?id=CVE-2016-9840,and fixed in 1.2.9 version. Only RocksDB's latest version 5.2.1 using zlib-1.2.9. Is it possible to update rocksDB to 5.2.1 in flink ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user SyinChwunLeo commented on the issue: https://github.com/apache/flink/pull/3704 I notice that RocksDB in 4.11.2 version is dependent on zlib-1.2.8, unfortunately, there is a security leak in zlib-1.2.8. https://bugzilla.redhat.com/show_bug.cgi?id=CVE-2016-9840 ,and fixed in 1.2.9 version. Only RocksDB's latest version 5.2.1 using zlib-1.2.9. Is it possible to update rocksDB to 5.2.1 in flink ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3704

          Unfortunately, all newer versions of RocksDB are broken for some functions when used through the Java API and will segfault when using merge operators. This is the same version as we previously used in Flink, so there is no regression. Furthermore, it is possible to use a different compression scheme. We might at some point try to build our custom version against zlib-1.2.9.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3704 Unfortunately, all newer versions of RocksDB are broken for some functions when used through the Java API and will segfault when using merge operators. This is the same version as we previously used in Flink, so there is no regression. Furthermore, it is possible to use a different compression scheme. We might at some point try to build our custom version against zlib-1.2.9.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3704

          @SyinChwunLeo We will try and contribute the patch to RocksDB and will also soon try and move to a newer RocksDB version, as soon as its Java API works again for the required functions. The RocksDB folks mentioned that the next release of RocksDB is quite soon and should fix that.

          That will hopefully address the issue. Until then, we cannot upgrade, unfortunately

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3704 @SyinChwunLeo We will try and contribute the patch to RocksDB and will also soon try and move to a newer RocksDB version, as soon as its Java API works again for the required functions. The RocksDB folks mentioned that the next release of RocksDB is quite soon and should fix that. That will hopefully address the issue. Until then, we cannot upgrade, unfortunately
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3704

          Could you tell what modifications are done in "FRocksDB" and post the url of source code repository?

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3704 Could you tell what modifications are done in "FRocksDB" and post the url of source code repository?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user SyinChwunLeo commented on the issue:

          https://github.com/apache/flink/pull/3704

          OK, FRocksDB is only available in master branch or we can also use it in flink-1.2.0?

          Show
          githubbot ASF GitHub Bot added a comment - Github user SyinChwunLeo commented on the issue: https://github.com/apache/flink/pull/3704 OK, FRocksDB is only available in master branch or we can also use it in flink-1.2.0?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3704

          We are using the newer StringAppendTESTOperator over the StringAppendOperator in rocksdbjni, because the merge algorithm used in StringAppendOperator has quadratic complexity in the number of merged elements. The modification is just instantiating the other implementation by providing a different parameter to a factory method.

          The source code is here: https://github.com/dataArtisans/frocksdb/tree/master <https://github.com/dataArtisans/frocksdb/tree/master>

          > Am 09.04.2017 um 18:51 schrieb Tao Wang <notifications@github.com>:
          >
          > Could you tell what modifications are done in "FRocksDB" and post the url of source code repository?
          >
          > —
          > You are receiving this because you were mentioned.
          > Reply to this email directly, view it on GitHub <https://github.com/apache/flink/pull/3704#issuecomment-292831352>, or mute the thread <https://github.com/notifications/unsubscribe-auth/ACB6n1O1jAlsbSln9KuaKonQMqP8VZloks5ruYsugaJpZM4M4M1I>.
          >

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3704 We are using the newer StringAppendTESTOperator over the StringAppendOperator in rocksdbjni, because the merge algorithm used in StringAppendOperator has quadratic complexity in the number of merged elements. The modification is just instantiating the other implementation by providing a different parameter to a factory method. The source code is here: https://github.com/dataArtisans/frocksdb/tree/master < https://github.com/dataArtisans/frocksdb/tree/master > > Am 09.04.2017 um 18:51 schrieb Tao Wang <notifications@github.com>: > > Could you tell what modifications are done in "FRocksDB" and post the url of source code repository? > > — > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub < https://github.com/apache/flink/pull/3704#issuecomment-292831352 >, or mute the thread < https://github.com/notifications/unsubscribe-auth/ACB6n1O1jAlsbSln9KuaKonQMqP8VZloks5ruYsugaJpZM4M4M1I >. >
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/3704

          @SyinChwunLeo: we'll merge the fix to both master and the release 1.2 branch.
          But you can start using FrocksDB right away, because its available in maven central.

          You just have to exclude the rocksdb dependency from Flink and include frocksdb:
          ```diff
          <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-statebackend-rocksdb_$

          {scala.binary.version}

          </artifactId>
          + <exclusions>
          + <exclusion>
          + <groupId>org.rocksdb</groupId>
          + <artifactId>rocksdbjni</artifactId>
          + </exclusion>
          + </exclusions>
          + </dependency>
          + <dependency>
          + <groupId>com.data-artisans</groupId>
          + <artifactId>frocksdbjni</artifactId>
          + <version>4.11.2-artisans</version>
          </dependency>
          <dependency>
          <groupId>org.apache.flink</groupId>

          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3704 @SyinChwunLeo: we'll merge the fix to both master and the release 1.2 branch. But you can start using FrocksDB right away, because its available in maven central. You just have to exclude the rocksdb dependency from Flink and include frocksdb: ```diff <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_$ {scala.binary.version} </artifactId> + <exclusions> + <exclusion> + <groupId>org.rocksdb</groupId> + <artifactId>rocksdbjni</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.data-artisans</groupId> + <artifactId>frocksdbjni</artifactId> + <version>4.11.2-artisans</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/3704

          I'll merge the change now

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3704 I'll merge the change now
          Hide
          rmetzger Robert Metzger added a comment -

          Merged change from rocksdb to frocksdb for the 1.2.1 release in http://git-wip-us.apache.org/repos/asf/flink/commit/6e903bfa

          Show
          rmetzger Robert Metzger added a comment - Merged change from rocksdb to frocksdb for the 1.2.1 release in http://git-wip-us.apache.org/repos/asf/flink/commit/6e903bfa
          Hide
          rmetzger Robert Metzger added a comment -
          Show
          rmetzger Robert Metzger added a comment - Merged frocksdb to 1.3-SNAPSHOT in http://git-wip-us.apache.org/repos/asf/flink/commit/e651df99
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3704

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3704
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user hsaputra commented on the issue:

          https://github.com/apache/flink/pull/3704

          I hope there is a plan to contribute back to rocksdb and we could come back to bring back dependencies on the fixed version. Relying on modified lib never good practice

          Show
          githubbot ASF GitHub Bot added a comment - Github user hsaputra commented on the issue: https://github.com/apache/flink/pull/3704 I hope there is a plan to contribute back to rocksdb and we could come back to bring back dependencies on the fixed version. Relying on modified lib never good practice
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3704

          @hsaputra There is a plan to do that and we are in touch with the RocksDB folks. The latest RocksDB master does not work for Flink though, currently, so we needed a custom backport to an earlier RocksDB version...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3704 @hsaputra There is a plan to do that and we are in touch with the RocksDB folks. The latest RocksDB master does not work for Flink though, currently, so we needed a custom backport to an earlier RocksDB version...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user hsaputra commented on the issue:

          https://github.com/apache/flink/pull/3704

          Yeah totally understand the reason. Just want to make sure we have plan to move back as soon as RocksDB usable for Flink.

          Show
          githubbot ASF GitHub Bot added a comment - Github user hsaputra commented on the issue: https://github.com/apache/flink/pull/3704 Yeah totally understand the reason. Just want to make sure we have plan to move back as soon as RocksDB usable for Flink.
          Hide
          srichter Stefan Richter added a comment -

          fixed in e651df99f4

          Show
          srichter Stefan Richter added a comment - fixed in e651df99f4

            People

            • Assignee:
              Unassigned
              Reporter:
              SyinchwunLeo Syinchwun Leo
            • Votes:
              0 Vote for this issue
              Watchers:
              15 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development