diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
index 395d2a8..49c286c 100644
--- a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
+++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
@@ -45,10 +45,13 @@ public class IgniteSinkTask extends SinkTask {
     private static String igniteConfigFile;
 
     /** Cache name. */
-    private static String cacheName;
+    private String cacheName;
 
     /** Entry transformer. */
-    private static StreamSingleTupleExtractor<SinkRecord, Object, Object> extractor;
+    private StreamSingleTupleExtractor<SinkRecord, Object, Object> extractor;
+
+    /** Data streamer instance */
+    private IgniteDataStreamer streamer;
 
     /** {@inheritDoc} */
     @Override public String version() {
@@ -61,24 +64,10 @@ public class IgniteSinkTask extends SinkTask {
      * @param props Task properties.
      */
     @Override public void start(Map<String, String> props) {
-        // Each task has the same parameters -- avoid setting more than once.
-        if (cacheName != null)
-            return;
-
         cacheName = props.get(IgniteSinkConstants.CACHE_NAME);
         igniteConfigFile = props.get(IgniteSinkConstants.CACHE_CFG_PATH);
 
-        if (props.containsKey(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE))
-            StreamerContext.getStreamer().allowOverwrite(
-                Boolean.parseBoolean(props.get(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE)));
-
-        if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE))
-            StreamerContext.getStreamer().perNodeBufferSize(
-                Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE)));
-
-        if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS))
-            StreamerContext.getStreamer().perNodeParallelOperations(
-                Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS)));
+        streamer = IgniteContext.getIgnite().dataStreamer(cacheName);
 
         if (props.containsKey(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS)) {
             String transformerCls = props.get(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS);
@@ -96,6 +85,18 @@ public class IgniteSinkTask extends SinkTask {
             }
         }
 
+        if (props.containsKey(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE))
+            streamer.allowOverwrite(
+                Boolean.parseBoolean(props.get(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE)));
+
+        if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE))
+            streamer.perNodeBufferSize(
+                Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE)));
+
+        if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS))
+            streamer.perNodeParallelOperations(
+                Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS)));
+
         stopped = false;
     }
 
@@ -111,11 +112,11 @@ public class IgniteSinkTask extends SinkTask {
                 // Data is flushed asynchronously when CACHE_PER_NODE_DATA_SIZE is reached.
                 if (extractor != null) {
                     Map.Entry<Object, Object> entry = extractor.extract(record);
-                    StreamerContext.getStreamer().addData(entry.getKey(), entry.getValue());
+                    streamer.addData(entry.getKey(), entry.getValue());
                 }
                 else {
                     if (record.key() != null) {
-                        StreamerContext.getStreamer().addData(record.key(), record.value());
+                        streamer.addData(record.key(), record.value());
                     }
                     else {
                         log.error("Failed to stream a record with null key!");
@@ -139,7 +140,7 @@ public class IgniteSinkTask extends SinkTask {
         if (stopped)
             return;
 
-        StreamerContext.getStreamer().flush();
+        streamer.flush();
     }
 
     /**
@@ -151,7 +152,7 @@ public class IgniteSinkTask extends SinkTask {
 
         stopped = true;
 
-        StreamerContext.getIgnite().close();
+        IgniteContext.getIgnite().close();
     }
 
     /**
@@ -161,25 +162,20 @@ public class IgniteSinkTask extends SinkTask {
      */
     protected static void setStopped(boolean stopped) {
         IgniteSinkTask.stopped = stopped;
-
-        extractor = null;
     }
 
     /**
      * Streamer context initializing grid and data streamer instances on demand.
      */
-    public static class StreamerContext {
+    public static class IgniteContext {
         /** Constructor. */
-        private StreamerContext() {
+        private IgniteContext() {
         }
 
         /** Instance holder. */
         private static class Holder {
             /** */
             private static final Ignite IGNITE = Ignition.start(igniteConfigFile);
-
-            /** */
-            private static final IgniteDataStreamer STREAMER = IGNITE.dataStreamer(cacheName);
         }
 
         /**
@@ -190,14 +186,5 @@ public class IgniteSinkTask extends SinkTask {
         public static Ignite getIgnite() {
             return Holder.IGNITE;
         }
-
-        /**
-         * Obtains data streamer instance.
-         *
-         * @return Data streamer instance.
-         */
-        public static IgniteDataStreamer getStreamer() {
-            return Holder.STREAMER;
-        }
     }
 }
