Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-9804

KeyedStateBackend.getKeys() does not work on RocksDB MapState

    XMLWordPrintableJSON

    Details

      Description

      This can be reproduced by adding this test to StateBackendTestBase:

      @Test
      public void testMapStateGetKeys() throws Exception {
      	final int namespace1ElementsNum = 1000;
      	final int namespace2ElementsNum = 1000;
      	String fieldName = "get-keys-test";
      	AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
      	try {
      		final String ns1 = "ns1";
      		MapState<String, Integer> keyedState1 = backend.getPartitionedState(
      			ns1,
      			StringSerializer.INSTANCE,
      			new MapStateDescriptor<>(fieldName, StringSerializer.INSTANCE, IntSerializer.INSTANCE)
      		);
      
      		for (int key = 0; key < namespace1ElementsNum; key++) {
      			backend.setCurrentKey(key);
      			keyedState1.put("he", key * 2);
      			keyedState1.put("ho", key * 2);
      		}
      
      		final String ns2 = "ns2";
      		MapState<String, Integer> keyedState2 = backend.getPartitionedState(
      			ns2,
      			StringSerializer.INSTANCE,
      			new MapStateDescriptor<>(fieldName, StringSerializer.INSTANCE, IntSerializer.INSTANCE)
      		);
      
      		for (int key = namespace1ElementsNum; key < namespace1ElementsNum + namespace2ElementsNum; key++) {
      			backend.setCurrentKey(key);
      			keyedState2.put("he", key * 2);
      			keyedState2.put("ho", key * 2);
      		}
      
      		// valid for namespace1
      		try (Stream<Integer> keysStream = backend.getKeys(fieldName, ns1).sorted()) {
      			PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value.intValue()).iterator();
      
      			for (int expectedKey = 0; expectedKey < namespace1ElementsNum; expectedKey++) {
      				assertTrue(actualIterator.hasNext());
      				assertEquals(expectedKey, actualIterator.nextInt());
      			}
      
      			assertFalse(actualIterator.hasNext());
      		}
      
      		// valid for namespace2
      		try (Stream<Integer> keysStream = backend.getKeys(fieldName, ns2).sorted()) {
      			PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value.intValue()).iterator();
      
      			for (int expectedKey = namespace1ElementsNum; expectedKey < namespace1ElementsNum + namespace2ElementsNum; expectedKey++) {
      				assertTrue(actualIterator.hasNext());
      				assertEquals(expectedKey, actualIterator.nextInt());
      			}
      
      			assertFalse(actualIterator.hasNext());
      		}
      	}
      	finally {
      		IOUtils.closeQuietly(backend);
      		backend.dispose();
      	}
      }
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                sihuazhou Sihua Zhou
                Reporter:
                aljoscha Aljoscha Krettek
              • Votes:
                0 Vote for this issue
                Watchers:
                6 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: