From 37d685447fc515d1fae44e3cf3c635ab3a927c88 Mon Sep 17 00:00:00 2001
From: Gordon Wang <gwang@eclipseoptions.com>
Date: Thu, 9 Jul 2020 15:41:36 +0800
Subject: [PATCH] KAFKA-10252: Use source topic name for
 MeteredTimestampedKeyValueStore backing globalKTables

---
 .../streams/state/internals/MeteredTimestampedKeyValueStore.java      | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
index d1446dce7..de3f0ed21 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -52,7 +53,8 @@ public class MeteredTimestampedKeyValueStore<K, V>
     @SuppressWarnings("unchecked")
     void initStoreSerde(final ProcessorContext context) {
         serdes = new StateSerdes<>(
-            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
+            context instanceof GlobalProcessorContextImpl ? name() :
+                ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
             keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
             valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.valueSerde()) : valueSerde);
     }
-- 
2.16.5

