Description
This junit test simply fails:
@Test
public void globalTable() {
StreamsBuilder builder = new StreamsBuilder();
@SuppressWarnings("unused")
final KTable<String,String> localTable = builder
.table("local",
Consumed.with(Serdes.String(), Serdes.String()),
Materialized.as("localStore"))
;
@SuppressWarnings("unused")
final GlobalKTable<String,String> globalTable = builder
.globalTable("global",
Consumed.with(Serdes.String(), Serdes.String()),
Materialized.as("globalStore"))
;
//
Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost");
TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), props);
//
final KeyValueStore<String,String> localStore = testDriver.getKeyValueStore("localStore");
Assert.assertNotNull(localStore);
Assert.assertNotNull(testDriver.getAllStateStores().get("localStore"));
//
final KeyValueStore<String,String> globalStore = testDriver.getKeyValueStore("globalStore");
Assert.assertNotNull(globalStore);
Assert.assertNotNull(testDriver.getAllStateStores().get("globalStore"));
//
final ConsumerRecordFactory<String,String> crf = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
testDriver.pipeInput(crf.create("local", "one", "TheOne"));
testDriver.pipeInput(crf.create("global", "one", "TheOne"));
//
Assert.assertEquals("TheOne", localStore.get("one"));
Assert.assertEquals("TheOne", globalStore.get("one"));
to make it work I had to modify the TopologyTestDriver class as follow:
...
public Map<String, StateStore> getAllStateStores() {
// final Map<String, StateStore> allStores = new HashMap<>();
// for (final String storeName : internalTopologyBuilder.allStateStoreName())
{ // allStores.put(storeName, ((ProcessorContextImpl) task.context()).getStateMgr().getStore(storeName)); // }// return allStores;
// FIXME
final ProcessorStateManager psm = ((ProcessorContextImpl) task.context()).getStateMgr();
final Map<String, StateStore> allStores = new HashMap<>();
for (final String storeName : internalTopologyBuilder.allStateStoreName())
{ StateStore res = psm.getStore(storeName); if (res == null) res = psm.getGlobalStore(storeName); allStores.put(storeName, res); }return allStores;
}
...
public StateStore getStateStore(final String name) {
// return ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name);
// FIXME
final ProcessorStateManager psm = ((ProcessorContextImpl) task.context()).getStateMgr();
StateStore res = psm.getStore(name);
if (res == null)
res = psm.getGlobalStore(name);
return res;
}
moreover I think it would be very useful to make the internal MockProducer public for testing cases where a producer is used along side with the "normal" stream processing by adding the method:
/**
* @return records sent with this producer are automatically streamed to the topology.
*/
public final Producer<byte[], byte[]> getProducer()
{ return producer; }
unfortunately this introduces another problem that could be verified by adding the following lines to the previous junit test:
...
**
//
ConsumerRecord<byte[],byte[]> cr = crf.create("dummy", "two", "Second"); // just to serialize keys and values
testDriver.getProducer().send(new ProducerRecord<>("local", null, cr.timestamp(), cr.key(), cr.value()));
testDriver.getProducer().send(new ProducerRecord<>("global", null, cr.timestamp(), cr.key(), cr.value()));
testDriver.advanceWallClockTime(0);
Assert.assertEquals("TheOne", localStore.get("one"));
Assert.assertEquals("Second", localStore.get("two"));
Assert.assertEquals("TheOne", globalStore.get("one"));
Assert.assertEquals("Second", globalStore.get("two"));
}
that could be fixed with:
private void captureOutputRecords() {
// Capture all the records sent to the producer ...
final List<ProducerRecord<byte[], byte[]>> output = producer.history();
producer.clear();
for (final ProducerRecord<byte[], byte[]> record : output) {
Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(record.topic());
if (outputRecords == null)
{ outputRecords = *new* LinkedList<>(); outputRecordsByTopic.put(record.topic(), outputRecords); }outputRecords.add(record);
// Forward back into the topology if the produced record is to an internal or a source topic ...
final String outputTopicName = record.topic();
if (internalTopics.contains(outputTopicName) || processorTopology.sourceTopics().contains(outputTopicName)
|| globalPartitionsByTopic.containsKey(outputTopicName)) { // FIXME
final byte[] serializedKey = record.key();
final byte[] serializedValue = record.value();
pipeInput(new ConsumerRecord<>(
outputTopicName,
-1,
-1L,
record.timestamp(),
TimestampType.CREATE_TIME,
0L,
serializedKey == null ? 0 : serializedKey.length,
serializedValue == null ? 0 : serializedValue.length,
serializedKey,
serializedValue));
}
}
}
Thank you
Attachments
Issue Links
- links to