diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 42b1292..204716f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -51,6 +51,7 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -501,7 +502,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     @Override
     public List<PartitionInfo> partitionsFor(String topic) {
         try {
-            waitOnMetadata(topic, this.metadataFetchTimeoutMs);
+            waitOnMetadata(Utils.notNull(topic), this.metadataFetchTimeoutMs);
         } catch (InterruptedException e) {
             throw new InterruptException(e);
         }
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 49f1427..31c8d0c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -25,7 +25,17 @@ import org.junit.Test;
 import java.util.Properties;
 
 public class KafkaProducerTest {
+    @Test(expected = NullPointerException.class)
+    public void topicNotNull() {
+        Properties props = new Properties();
+        props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose");
+        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9999");
+        props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
 
+        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(
+                props, new ByteArraySerializer(), new ByteArraySerializer());
+        producer.partitionsFor(null);
+    }
 
     @Test
     public void testConstructorClose() throws Exception {
