Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6210

RocksDB instance should be closed in ListViaMergeSpeedMiniBenchmark

    Details

      Description

      rocksDB instance should be closed upon returning from main().

      ListViaRangeSpeedMiniBenchmark has similar issue.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user fanyon opened a pull request:

          https://github.com/apache/flink/pull/3652

          FLINK-6210 close RocksDB in ListViaMergeSpeedMiniBenchmark && ListV…

          close RocksDB in ListViaMergeSpeedMiniBenchmark && ListViaRangeSpeedMiniBenchmark

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/fanyon/flink FLINK-6210

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3652.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3652


          commit e0044c64fd10c2b54ec1b61e41812f6bbf3723ae
          Author: mengji.fy <mengji.fy@taobao.com>
          Date: 2017-03-30T03:12:41Z

          FLINK-6210 close RocksDB in ListViaMergeSpeedMiniBenchmark && ListViaRangeSpeedMiniBenchmark


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user fanyon opened a pull request: https://github.com/apache/flink/pull/3652 FLINK-6210 close RocksDB in ListViaMergeSpeedMiniBenchmark && ListV… close RocksDB in ListViaMergeSpeedMiniBenchmark && ListViaRangeSpeedMiniBenchmark You can merge this pull request into a Git repository by running: $ git pull https://github.com/fanyon/flink FLINK-6210 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3652.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3652 commit e0044c64fd10c2b54ec1b61e41812f6bbf3723ae Author: mengji.fy <mengji.fy@taobao.com> Date: 2017-03-30T03:12:41Z FLINK-6210 close RocksDB in ListViaMergeSpeedMiniBenchmark && ListViaRangeSpeedMiniBenchmark
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shixiaogang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3652#discussion_r108869459

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java —
          @@ -54,59 +54,62 @@ public static void main(String[] args) throws Exception {

          final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath());

          • final String key = "key";
          • final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
            + try {
            + final String key = "key";
            + final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
          • final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
          • final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
            + final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
            + final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
          • final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4);
            + final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4);
          • final Unsafe unsafe = MemoryUtils.UNSAFE;
          • final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
            + final Unsafe unsafe = MemoryUtils.UNSAFE;
            + final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
          • final int num = 50000;
          • System.out.println("begin insert");
            + final int num = 50000;
            + System.out.println("begin insert");
          • final long beginInsert = System.nanoTime();
          • for (int i = 0; i < num; i++) { - unsafe.putInt(keyTemplate, offset, i); - rocksDB.put(write_options, keyTemplate, valueBytes); - }
          • final long endInsert = System.nanoTime();
          • System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
            -
          • final byte[] resultHolder = new byte[num * valueBytes.length];
            -
          • final long beginGet = System.nanoTime();
            -
          • final RocksIterator iterator = rocksDB.newIterator();
          • int pos = 0;
            -
          • // seek to start
          • unsafe.putInt(keyTemplate, offset, 0);
          • iterator.seek(keyTemplate);
            -
          • // mark end
          • unsafe.putInt(keyTemplate, offset, -1);
            -
          • // iterate
          • while (iterator.isValid()) {
          • byte[] currKey = iterator.key();
          • if (samePrefix(keyBytes, currKey)) {
          • byte[] currValue = iterator.value();
          • System.arraycopy(currValue, 0, resultHolder, pos, currValue.length);
          • pos += currValue.length;
          • iterator.next();
            + final long beginInsert = System.nanoTime();
            + for (int i = 0; i < num; i++) { + unsafe.putInt(keyTemplate, offset, i); + rocksDB.put(write_options, keyTemplate, valueBytes); }
          • else {
          • break;
            + final long endInsert = System.nanoTime();
            + System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
            +
            + final byte[] resultHolder = new byte[num * valueBytes.length];
            +
            + final long beginGet = System.nanoTime();
            +
            + final RocksIterator iterator = rocksDB.newIterator();
            + int pos = 0;
            +
            + // seek to start
            + unsafe.putInt(keyTemplate, offset, 0);
            + iterator.seek(keyTemplate);
            +
            + // mark end
            + unsafe.putInt(keyTemplate, offset, -1);
            +
            + // iterate
            + while (iterator.isValid())
            Unknown macro: { + byte[] currKey = iterator.key(); + if (samePrefix(keyBytes, currKey)) { + byte[] currValue = iterator.value(); + System.arraycopy(currValue, 0, resultHolder, pos, currValue.length); + pos += currValue.length; + iterator.next(); + } else { + break; + } }
          • }
          • final long endGet = System.nanoTime();
            + final long endGet = System.nanoTime();
          • System.out.println("end get - duration: " + ((endGet - beginGet) / 1_000_000) + " ms");
            + System.out.println("end get - duration: " + ((endGet - beginGet) / 1_000_000) + " ms");
            + } finally {
            + rocksDB.close();
              • End diff –

          As with the problem mentioned above, all RocksDB objects should be closed once they are not used.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3652#discussion_r108869459 — Diff: flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java — @@ -54,59 +54,62 @@ public static void main(String[] args) throws Exception { final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath()); final String key = "key"; final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; + try { + final String key = "key"; + final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); + final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); + final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4); + final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4); final Unsafe unsafe = MemoryUtils.UNSAFE; final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4; + final Unsafe unsafe = MemoryUtils.UNSAFE; + final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4; final int num = 50000; System.out.println("begin insert"); + final int num = 50000; + System.out.println("begin insert"); final long beginInsert = System.nanoTime(); for (int i = 0; i < num; i++) { - unsafe.putInt(keyTemplate, offset, i); - rocksDB.put(write_options, keyTemplate, valueBytes); - } final long endInsert = System.nanoTime(); System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms"); - final byte[] resultHolder = new byte [num * valueBytes.length] ; - final long beginGet = System.nanoTime(); - final RocksIterator iterator = rocksDB.newIterator(); int pos = 0; - // seek to start unsafe.putInt(keyTemplate, offset, 0); iterator.seek(keyTemplate); - // mark end unsafe.putInt(keyTemplate, offset, -1); - // iterate while (iterator.isValid()) { byte[] currKey = iterator.key(); if (samePrefix(keyBytes, currKey)) { byte[] currValue = iterator.value(); System.arraycopy(currValue, 0, resultHolder, pos, currValue.length); pos += currValue.length; iterator.next(); + final long beginInsert = System.nanoTime(); + for (int i = 0; i < num; i++) { + unsafe.putInt(keyTemplate, offset, i); + rocksDB.put(write_options, keyTemplate, valueBytes); } else { break; + final long endInsert = System.nanoTime(); + System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms"); + + final byte[] resultHolder = new byte [num * valueBytes.length] ; + + final long beginGet = System.nanoTime(); + + final RocksIterator iterator = rocksDB.newIterator(); + int pos = 0; + + // seek to start + unsafe.putInt(keyTemplate, offset, 0); + iterator.seek(keyTemplate); + + // mark end + unsafe.putInt(keyTemplate, offset, -1); + + // iterate + while (iterator.isValid()) Unknown macro: { + byte[] currKey = iterator.key(); + if (samePrefix(keyBytes, currKey)) { + byte[] currValue = iterator.value(); + System.arraycopy(currValue, 0, resultHolder, pos, currValue.length); + pos += currValue.length; + iterator.next(); + } else { + break; + } } } final long endGet = System.nanoTime(); + final long endGet = System.nanoTime(); System.out.println("end get - duration: " + ((endGet - beginGet) / 1_000_000) + " ms"); + System.out.println("end get - duration: " + ((endGet - beginGet) / 1_000_000) + " ms"); + } finally { + rocksDB.close(); End diff – As with the problem mentioned above, all RocksDB objects should be closed once they are not used.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shixiaogang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3652#discussion_r108868469

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java —
          @@ -50,55 +50,59 @@ public static void main(String[] args) throws Exception {

          final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath());

          • final String key = "key";
          • final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
            + try {
              • End diff –

          I think it's better to use try-with-resources here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3652#discussion_r108868469 — Diff: flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java — @@ -50,55 +50,59 @@ public static void main(String[] args) throws Exception { final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath()); final String key = "key"; final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; + try { End diff – I think it's better to use try-with-resources here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shixiaogang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3652#discussion_r108869236

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java —
          @@ -54,59 +54,62 @@ public static void main(String[] args) throws Exception {

          final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath());

          • final String key = "key";
          • final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
            + try {
            + final String key = "key";
            + final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
          • final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
          • final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
            + final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
            + final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
          • final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4);
            + final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4);
          • final Unsafe unsafe = MemoryUtils.UNSAFE;
          • final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
            + final Unsafe unsafe = MemoryUtils.UNSAFE;
            + final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
          • final int num = 50000;
          • System.out.println("begin insert");
            + final int num = 50000;
            + System.out.println("begin insert");
          • final long beginInsert = System.nanoTime();
          • for (int i = 0; i < num; i++) { - unsafe.putInt(keyTemplate, offset, i); - rocksDB.put(write_options, keyTemplate, valueBytes); - }
          • final long endInsert = System.nanoTime();
          • System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
            -
          • final byte[] resultHolder = new byte[num * valueBytes.length];
            -
          • final long beginGet = System.nanoTime();
            -
          • final RocksIterator iterator = rocksDB.newIterator();
          • int pos = 0;
            -
          • // seek to start
          • unsafe.putInt(keyTemplate, offset, 0);
          • iterator.seek(keyTemplate);
            -
          • // mark end
          • unsafe.putInt(keyTemplate, offset, -1);
            -
          • // iterate
          • while (iterator.isValid()) {
          • byte[] currKey = iterator.key();
          • if (samePrefix(keyBytes, currKey)) {
          • byte[] currValue = iterator.value();
          • System.arraycopy(currValue, 0, resultHolder, pos, currValue.length);
          • pos += currValue.length;
          • iterator.next();
            + final long beginInsert = System.nanoTime();
            + for (int i = 0; i < num; i++) { + unsafe.putInt(keyTemplate, offset, i); + rocksDB.put(write_options, keyTemplate, valueBytes); }
          • else {
          • break;
            + final long endInsert = System.nanoTime();
            + System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
            +
            + final byte[] resultHolder = new byte[num * valueBytes.length];
            +
            + final long beginGet = System.nanoTime();
            +
            + final RocksIterator iterator = rocksDB.newIterator();
              • End diff –

          The iterator should be closed once it's not used. So it's better to use try-with-resources here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3652#discussion_r108869236 — Diff: flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java — @@ -54,59 +54,62 @@ public static void main(String[] args) throws Exception { final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath()); final String key = "key"; final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; + try { + final String key = "key"; + final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); + final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); + final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4); + final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4); final Unsafe unsafe = MemoryUtils.UNSAFE; final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4; + final Unsafe unsafe = MemoryUtils.UNSAFE; + final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4; final int num = 50000; System.out.println("begin insert"); + final int num = 50000; + System.out.println("begin insert"); final long beginInsert = System.nanoTime(); for (int i = 0; i < num; i++) { - unsafe.putInt(keyTemplate, offset, i); - rocksDB.put(write_options, keyTemplate, valueBytes); - } final long endInsert = System.nanoTime(); System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms"); - final byte[] resultHolder = new byte [num * valueBytes.length] ; - final long beginGet = System.nanoTime(); - final RocksIterator iterator = rocksDB.newIterator(); int pos = 0; - // seek to start unsafe.putInt(keyTemplate, offset, 0); iterator.seek(keyTemplate); - // mark end unsafe.putInt(keyTemplate, offset, -1); - // iterate while (iterator.isValid()) { byte[] currKey = iterator.key(); if (samePrefix(keyBytes, currKey)) { byte[] currValue = iterator.value(); System.arraycopy(currValue, 0, resultHolder, pos, currValue.length); pos += currValue.length; iterator.next(); + final long beginInsert = System.nanoTime(); + for (int i = 0; i < num; i++) { + unsafe.putInt(keyTemplate, offset, i); + rocksDB.put(write_options, keyTemplate, valueBytes); } else { break; + final long endInsert = System.nanoTime(); + System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms"); + + final byte[] resultHolder = new byte [num * valueBytes.length] ; + + final long beginGet = System.nanoTime(); + + final RocksIterator iterator = rocksDB.newIterator(); End diff – The iterator should be closed once it's not used. So it's better to use try-with-resources here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shixiaogang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3652#discussion_r108870133

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java —
          @@ -50,55 +50,59 @@ public static void main(String[] args) throws Exception {

          final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath());

          • final String key = "key";
          • final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
            + try {
            + final String key = "key";
            + final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
          • final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
          • final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
            + final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
            + final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
          • final int num = 50000;
            + final int num = 50000;
          • // ----- insert -----
          • System.out.println("begin insert");
            + // ----- insert -----
            + System.out.println("begin insert");
          • final long beginInsert = System.nanoTime();
          • for (int i = 0; i < num; i++) { - rocksDB.merge(write_options, keyBytes, valueBytes); - }
          • final long endInsert = System.nanoTime();
          • System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
            + final long beginInsert = System.nanoTime();
            + for (int i = 0; i < num; i++) { + rocksDB.merge(write_options, keyBytes, valueBytes); + }

            + final long endInsert = System.nanoTime();
            + System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");

          • // ----- read (attempt 1) -----
            + // ----- read (attempt 1) -----
          • final byte[] resultHolder = new byte[num * (valueBytes.length + 2)];
          • final long beginGet1 = System.nanoTime();
          • rocksDB.get(keyBytes, resultHolder);
          • final long endGet1 = System.nanoTime();
            + final byte[] resultHolder = new byte[num * (valueBytes.length + 2)];
            + final long beginGet1 = System.nanoTime();
            + rocksDB.get(keyBytes, resultHolder);
            + final long endGet1 = System.nanoTime();
          • System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms");
            + System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms");
          • // ----- read (attempt 2) -----
            + // ----- read (attempt 2) -----
          • final long beginGet2 = System.nanoTime();
          • rocksDB.get(keyBytes, resultHolder);
          • final long endGet2 = System.nanoTime();
            + final long beginGet2 = System.nanoTime();
            + rocksDB.get(keyBytes, resultHolder);
            + final long endGet2 = System.nanoTime();
          • System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms");
            + System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms");
          • // ----- compact -----
          • System.out.println("compacting...");
          • final long beginCompact = System.nanoTime();
          • rocksDB.compactRange();
          • final long endCompact = System.nanoTime();
            + // ----- compact -----
            + System.out.println("compacting...");
            + final long beginCompact = System.nanoTime();
            + rocksDB.compactRange();
            + final long endCompact = System.nanoTime();
          • System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000) + " ms");
            + System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000) + " ms");
          • // ----- read (attempt 3) -----
            + // ----- read (attempt 3) -----
          • final long beginGet3 = System.nanoTime();
          • rocksDB.get(keyBytes, resultHolder);
          • final long endGet3 = System.nanoTime();
            + final long beginGet3 = System.nanoTime();
            + rocksDB.get(keyBytes, resultHolder);
            + final long endGet3 = System.nanoTime();
          • System.out.println("end get - duration: " + ((endGet3 - beginGet3) / 1_000_000) + " ms");
            + System.out.println("end get - duration: " + ((endGet3 - beginGet3) / 1_000_000) + " ms");
            + } finally {
            + rocksDB.close();
              • End diff –

          I think it's better to delete the RocksDB's directory here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3652#discussion_r108870133 — Diff: flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java — @@ -50,55 +50,59 @@ public static void main(String[] args) throws Exception { final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath()); final String key = "key"; final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; + try { + final String key = "key"; + final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); + final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); + final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); final int num = 50000; + final int num = 50000; // ----- insert ----- System.out.println("begin insert"); + // ----- insert ----- + System.out.println("begin insert"); final long beginInsert = System.nanoTime(); for (int i = 0; i < num; i++) { - rocksDB.merge(write_options, keyBytes, valueBytes); - } final long endInsert = System.nanoTime(); System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms"); + final long beginInsert = System.nanoTime(); + for (int i = 0; i < num; i++) { + rocksDB.merge(write_options, keyBytes, valueBytes); + } + final long endInsert = System.nanoTime(); + System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms"); // ----- read (attempt 1) ----- + // ----- read (attempt 1) ----- final byte[] resultHolder = new byte [num * (valueBytes.length + 2)] ; final long beginGet1 = System.nanoTime(); rocksDB.get(keyBytes, resultHolder); final long endGet1 = System.nanoTime(); + final byte[] resultHolder = new byte [num * (valueBytes.length + 2)] ; + final long beginGet1 = System.nanoTime(); + rocksDB.get(keyBytes, resultHolder); + final long endGet1 = System.nanoTime(); System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms"); + System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms"); // ----- read (attempt 2) ----- + // ----- read (attempt 2) ----- final long beginGet2 = System.nanoTime(); rocksDB.get(keyBytes, resultHolder); final long endGet2 = System.nanoTime(); + final long beginGet2 = System.nanoTime(); + rocksDB.get(keyBytes, resultHolder); + final long endGet2 = System.nanoTime(); System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms"); + System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms"); // ----- compact ----- System.out.println("compacting..."); final long beginCompact = System.nanoTime(); rocksDB.compactRange(); final long endCompact = System.nanoTime(); + // ----- compact ----- + System.out.println("compacting..."); + final long beginCompact = System.nanoTime(); + rocksDB.compactRange(); + final long endCompact = System.nanoTime(); System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000) + " ms"); + System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000) + " ms"); // ----- read (attempt 3) ----- + // ----- read (attempt 3) ----- final long beginGet3 = System.nanoTime(); rocksDB.get(keyBytes, resultHolder); final long endGet3 = System.nanoTime(); + final long beginGet3 = System.nanoTime(); + rocksDB.get(keyBytes, resultHolder); + final long endGet3 = System.nanoTime(); System.out.println("end get - duration: " + ((endGet3 - beginGet3) / 1_000_000) + " ms"); + System.out.println("end get - duration: " + ((endGet3 - beginGet3) / 1_000_000) + " ms"); + } finally { + rocksDB.close(); End diff – I think it's better to delete the RocksDB's directory here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shixiaogang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3652#discussion_r108868849

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java —
          @@ -50,55 +50,59 @@ public static void main(String[] args) throws Exception {

          final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath());

          • final String key = "key";
          • final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
            + try {
            + final String key = "key";
            + final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
          • final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
          • final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
            + final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
            + final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
          • final int num = 50000;
            + final int num = 50000;
          • // ----- insert -----
          • System.out.println("begin insert");
            + // ----- insert -----
            + System.out.println("begin insert");
          • final long beginInsert = System.nanoTime();
          • for (int i = 0; i < num; i++) { - rocksDB.merge(write_options, keyBytes, valueBytes); - }
          • final long endInsert = System.nanoTime();
          • System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
            + final long beginInsert = System.nanoTime();
            + for (int i = 0; i < num; i++) { + rocksDB.merge(write_options, keyBytes, valueBytes); + }

            + final long endInsert = System.nanoTime();
            + System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");

          • // ----- read (attempt 1) -----
            + // ----- read (attempt 1) -----
          • final byte[] resultHolder = new byte[num * (valueBytes.length + 2)];
          • final long beginGet1 = System.nanoTime();
          • rocksDB.get(keyBytes, resultHolder);
          • final long endGet1 = System.nanoTime();
            + final byte[] resultHolder = new byte[num * (valueBytes.length + 2)];
            + final long beginGet1 = System.nanoTime();
            + rocksDB.get(keyBytes, resultHolder);
            + final long endGet1 = System.nanoTime();
          • System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms");
            + System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms");
          • // ----- read (attempt 2) -----
            + // ----- read (attempt 2) -----
          • final long beginGet2 = System.nanoTime();
          • rocksDB.get(keyBytes, resultHolder);
          • final long endGet2 = System.nanoTime();
            + final long beginGet2 = System.nanoTime();
            + rocksDB.get(keyBytes, resultHolder);
            + final long endGet2 = System.nanoTime();
          • System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms");
            + System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms");
          • // ----- compact -----
          • System.out.println("compacting...");
          • final long beginCompact = System.nanoTime();
          • rocksDB.compactRange();
          • final long endCompact = System.nanoTime();
            + // ----- compact -----
            + System.out.println("compacting...");
            + final long beginCompact = System.nanoTime();
            + rocksDB.compactRange();
            + final long endCompact = System.nanoTime();
          • System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000) + " ms");
            + System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000) + " ms");
          • // ----- read (attempt 3) -----
            + // ----- read (attempt 3) -----
          • final long beginGet3 = System.nanoTime();
          • rocksDB.get(keyBytes, resultHolder);
          • final long endGet3 = System.nanoTime();
            + final long beginGet3 = System.nanoTime();
            + rocksDB.get(keyBytes, resultHolder);
            + final long endGet3 = System.nanoTime();
          • System.out.println("end get - duration: " + ((endGet3 - beginGet3) / 1_000_000) + " ms");
            + System.out.println("end get - duration: " + ((endGet3 - beginGet3) / 1_000_000) + " ms");
            + } finally {
            + rocksDB.close();
              • End diff –

          Besides `RocksDB`, `Options` and`WriteOptions` should also be closed here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3652#discussion_r108868849 — Diff: flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java — @@ -50,55 +50,59 @@ public static void main(String[] args) throws Exception { final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath()); final String key = "key"; final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; + try { + final String key = "key"; + final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); + final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); + final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); final int num = 50000; + final int num = 50000; // ----- insert ----- System.out.println("begin insert"); + // ----- insert ----- + System.out.println("begin insert"); final long beginInsert = System.nanoTime(); for (int i = 0; i < num; i++) { - rocksDB.merge(write_options, keyBytes, valueBytes); - } final long endInsert = System.nanoTime(); System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms"); + final long beginInsert = System.nanoTime(); + for (int i = 0; i < num; i++) { + rocksDB.merge(write_options, keyBytes, valueBytes); + } + final long endInsert = System.nanoTime(); + System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms"); // ----- read (attempt 1) ----- + // ----- read (attempt 1) ----- final byte[] resultHolder = new byte [num * (valueBytes.length + 2)] ; final long beginGet1 = System.nanoTime(); rocksDB.get(keyBytes, resultHolder); final long endGet1 = System.nanoTime(); + final byte[] resultHolder = new byte [num * (valueBytes.length + 2)] ; + final long beginGet1 = System.nanoTime(); + rocksDB.get(keyBytes, resultHolder); + final long endGet1 = System.nanoTime(); System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms"); + System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms"); // ----- read (attempt 2) ----- + // ----- read (attempt 2) ----- final long beginGet2 = System.nanoTime(); rocksDB.get(keyBytes, resultHolder); final long endGet2 = System.nanoTime(); + final long beginGet2 = System.nanoTime(); + rocksDB.get(keyBytes, resultHolder); + final long endGet2 = System.nanoTime(); System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms"); + System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms"); // ----- compact ----- System.out.println("compacting..."); final long beginCompact = System.nanoTime(); rocksDB.compactRange(); final long endCompact = System.nanoTime(); + // ----- compact ----- + System.out.println("compacting..."); + final long beginCompact = System.nanoTime(); + rocksDB.compactRange(); + final long endCompact = System.nanoTime(); System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000) + " ms"); + System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000) + " ms"); // ----- read (attempt 3) ----- + // ----- read (attempt 3) ----- final long beginGet3 = System.nanoTime(); rocksDB.get(keyBytes, resultHolder); final long endGet3 = System.nanoTime(); + final long beginGet3 = System.nanoTime(); + rocksDB.get(keyBytes, resultHolder); + final long endGet3 = System.nanoTime(); System.out.println("end get - duration: " + ((endGet3 - beginGet3) / 1_000_000) + " ms"); + System.out.println("end get - duration: " + ((endGet3 - beginGet3) / 1_000_000) + " ms"); + } finally { + rocksDB.close(); End diff – Besides `RocksDB`, `Options` and`WriteOptions` should also be closed here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fanyon commented on the issue:

          https://github.com/apache/flink/pull/3652

          Hi @shixiaogang , thank you for your reviews. Close and clean all resources such as temp directories is a good idea, I'll fix them later in this PR

          Show
          githubbot ASF GitHub Bot added a comment - Github user fanyon commented on the issue: https://github.com/apache/flink/pull/3652 Hi @shixiaogang , thank you for your reviews. Close and clean all resources such as temp directories is a good idea, I'll fix them later in this PR
          Hide
          yuzhihong@gmail.com Ted Yu added a comment -

          I agree all resources should be closed.

          Show
          yuzhihong@gmail.com Ted Yu added a comment - I agree all resources should be closed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fanyon commented on the issue:

          https://github.com/apache/flink/pull/3652

          Hi @shixiaogang , as we discussed before, I have closed all resources in latest commit, thank you for your comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fanyon commented on the issue: https://github.com/apache/flink/pull/3652 Hi @shixiaogang , as we discussed before, I have closed all resources in latest commit, thank you for your comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3652

          Thank you for the patch!
          Taking this over and merging this...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3652 Thank you for the patch! Taking this over and merging this...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3652

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3652
          Hide
          StephanEwen Stephan Ewen added a comment -

          Fixed in ea054a7d3bc452d153c070b2789ddbe6a2f080a7

          Show
          StephanEwen Stephan Ewen added a comment - Fixed in ea054a7d3bc452d153c070b2789ddbe6a2f080a7

            People

            • Assignee:
              zjureel Fang Yong
              Reporter:
              yuzhihong@gmail.com Ted Yu
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development