From b0cdaa9545323a6b2a7a9141c4f227f4c671e3a6 Mon Sep 17 00:00:00 2001
From: Andrii Biletskyi <andrii.biletskyi@stealth.ly>
Date: Wed, 9 Dec 2015 15:46:32 +0200
Subject: [PATCH] KIP-30 Plugins infrastructure and Leader Election integration
 [WIP]

---
 bin/kafka-run-class.sh                             |  2 +-
 build.gradle                                       | 32 ++++++++
 .../scala/kafka/controller/KafkaController.scala   | 50 ++++--------
 .../kafka/plugin/PluginsClassLoaderHelper.scala    | 46 +++++++++++
 core/src/main/scala/kafka/server/KafkaConfig.scala | 23 ++++++
 core/src/main/scala/kafka/server/KafkaServer.scala |  9 ++-
 .../main/scala/kafka/server/LeaderElector.scala    |  2 +
 .../scala/kafka/server/LeaderElectorAdapter.scala  | 87 +++++++++++++++++++++
 .../kafka/plugin/interface/LeaderElection.scala    | 91 ++++++++++++++++++++++
 .../kafka/plugin/interface/ListenerRegistry.scala  | 77 ++++++++++++++++++
 .../kafka/plugin/interface/PluginLocator.scala     | 31 ++++++++
 settings.gradle                                    |  2 +-
 12 files changed, 414 insertions(+), 38 deletions(-)
 create mode 100644 core/src/main/scala/kafka/plugin/PluginsClassLoaderHelper.scala
 create mode 100644 core/src/main/scala/kafka/server/LeaderElectorAdapter.scala
 create mode 100644 plugin-interface/src/main/scala/org/apache/kafka/plugin/interface/LeaderElection.scala
 create mode 100644 plugin-interface/src/main/scala/org/apache/kafka/plugin/interface/ListenerRegistry.scala
 create mode 100644 plugin-interface/src/main/scala/org/apache/kafka/plugin/interface/PluginLocator.scala

diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index fcf442b..ef9dcf9 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -179,5 +179,5 @@ fi
 if [ "x$DAEMON_MODE" = "xtrue" ]; then
   nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
 else
-  exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
+  exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"$base_dir/plugin/plugin.jar" $KAFKA_OPTS "$@"
 fi
diff --git a/build.gradle b/build.gradle
index e24279e..23ead68 100644
--- a/build.gradle
+++ b/build.gradle
@@ -263,6 +263,7 @@ project(':core') {
 
   dependencies {
     compile project(':clients')
+    compile project(':plugin-interface')
     compile "$slf4jlog4j"
     compile "org.scala-lang:scala-library:$resolvedScalaVersion"
     compile 'org.apache.zookeeper:zookeeper:3.4.6'
@@ -356,6 +357,7 @@ project(':core') {
     from(project(':connect:json').configurations.runtime) { into("libs/") }
     from(project(':connect:file').jar) { into("libs/") }
     from(project(':connect:file').configurations.runtime) { into("libs/") }
+    //from(project(':plugin-interface').jar) { into("libs/") }
   }
 
   jar {
@@ -902,3 +904,33 @@ project(':connect:file') {
   }
   test.dependsOn('checkstyleMain', 'checkstyleTest')
 }
+
+project(':plugin-interface') {
+    println "Building project 'plugin-interface' with Scala version $resolvedScalaVersion"
+    apply plugin: 'scala'
+
+    archivesBaseName = "kafka-plugin-interface"
+
+    dependencies {
+        compile "org.scala-lang:scala-library:$resolvedScalaVersion"
+    }
+
+    jar {
+//        dependsOn 'copyDependantLibs'
+    }
+
+    clean.doFirst {
+        delete "$buildDir/kafka/"
+    }
+
+//    test {
+//        testLogging {
+//            events "passed", "skipped", "failed"
+//            exceptionFormat = 'full'
+//        }
+//    }
+
+    configurations {
+        archives.extendsFrom(testCompile)
+    }
+}
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 7c03a24..ac035fa 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -20,6 +20,7 @@ import java.util
 
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractRequestResponse}
+import org.apache.kafka.plugin.interface.LeaderElection
 
 import scala.collection._
 import com.yammer.metrics.core.Gauge
@@ -153,15 +154,22 @@ object KafkaController extends Logging {
   }
 }
 
-class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
+class KafkaController(val config : KafkaConfig,
+                      zkUtils: ZkUtils,
+                      val brokerState: BrokerState,
+                      time: Time, metrics: Metrics,
+                      threadNamePrefix: Option[String] = None,
+                      controllerElection: LeaderElection) extends Logging with KafkaMetricsGroup {
+
   this.logIdent = "[Controller " + config.brokerId + "]: "
   private var isRunning = true
   private val stateChangeLogger = KafkaController.stateChangeLogger
   val controllerContext = new ControllerContext(zkUtils, config.zkSessionTimeoutMs)
   val partitionStateMachine = new PartitionStateMachine(this)
   val replicaStateMachine = new ReplicaStateMachine(this)
-  private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
-    onControllerResignation, config.brokerId)
+
+  private val controllerElector = new LeaderElectorAdapter(onControllerFailover, onControllerResignation, config.brokerId, controllerElection)
+
   // have a separate scheduler for the controller to be able to start and stop independently of the
   // kafka server
   private val autoRebalanceScheduler = new KafkaScheduler(1)
@@ -277,7 +285,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
                       // Resign if the controller is in an illegal state
                       error("Forcing the controller to resign")
                       brokerRequestBatch.clear()
-                      controllerElector.resign()
+                      controllerElector.resign
 
                       throw e
                     }
@@ -731,7 +739,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
   }
 
   private def registerSessionExpirationListener() = {
-    zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener())
+    //zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener())
   }
 
   private def initializeControllerContext() {
@@ -912,7 +920,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
             // Resign if the controller is in an illegal state
             error("Forcing the controller to resign")
             brokerRequestBatch.clear()
-            controllerElector.resign()
+            controllerElector.resign
 
             throw e
           }
@@ -1032,7 +1040,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
         // Resign if the controller is in an illegal state
         error("Forcing the controller to resign")
         brokerRequestBatch.clear()
-        controllerElector.resign()
+        controllerElector.resign
 
         throw e
       }
@@ -1150,34 +1158,6 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
     finalLeaderIsrAndControllerEpoch
   }
 
-  class SessionExpirationListener() extends IZkStateListener with Logging {
-    this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
-    @throws(classOf[Exception])
-    def handleStateChanged(state: KeeperState) {
-      // do nothing, since zkclient will do reconnect for us.
-    }
-
-    /**
-     * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
-     * any ephemeral nodes here.
-     *
-     * @throws Exception
-     *             On any error.
-     */
-    @throws(classOf[Exception])
-    def handleNewSession() {
-      info("ZK expired; shut down all controller components and try to re-elect")
-      inLock(controllerContext.controllerLock) {
-        onControllerResignation()
-        controllerElector.elect
-      }
-    }
-
-    override def handleSessionEstablishmentError(error: Throwable): Unit = {
-      //no-op handleSessionEstablishmentError in KafkaHealthCheck should handle this error in its handleSessionEstablishmentError
-    }
-  }
-
   private def checkAndTriggerPartitionRebalance(): Unit = {
     if (isActive()) {
       trace("checking need to trigger partition rebalance")
diff --git a/core/src/main/scala/kafka/plugin/PluginsClassLoaderHelper.scala b/core/src/main/scala/kafka/plugin/PluginsClassLoaderHelper.scala
new file mode 100644
index 0000000..e20831a
--- /dev/null
+++ b/core/src/main/scala/kafka/plugin/PluginsClassLoaderHelper.scala
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.plugin
+
+import java.io.File
+import java.net.URLClassLoader
+
+import org.apache.kafka.plugin.interface.{PluginLocator, LeaderElection}
+
+/**
+ * Loads PluginLocator class - an entry point for using plugins in Kafka.
+ * Looks for a specific classname (depends on the plugin implementation) in provided
+ * additional jars. It is assumed that these additional jars were provided in classpath
+ * during the Kafka startup.
+ *
+ * // TODO automate classloading - look for the PluginLocator interface implementation
+ * // (no need to provide pluginLocatorClassname param), fail-fast in case more than one is provided
+ */
+class PluginsClassLoaderHelper(additionalJars: Seq[String],
+                               pluginLocatorClassname: String) {
+
+  private val urls = additionalJars.map {
+    jar =>
+      new File(jar).toURI.toURL
+  }
+
+  private val classloader = new URLClassLoader(urls.toArray)
+
+  def loadPluginLocator(): PluginLocator = {
+    classloader.loadClass(pluginLocatorClassname).newInstance().asInstanceOf[PluginLocator]
+  }
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 32dfe42..2d4d564 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -176,6 +176,9 @@ object Defaults {
   val SaslKerberosMinTimeBeforeRelogin = SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN
   val SaslKerberosPrincipalToLocalRules = SaslConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES
 
+  /** ********* Plugins ***********/
+  val AdditionalJars = "plugin/plugin.jar"
+
 }
 
 object KafkaConfig {
@@ -331,6 +334,11 @@ object KafkaConfig {
   val SaslKerberosMinTimeBeforeReloginProp = SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN
   val SaslKerberosPrincipalToLocalRulesProp = SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES
 
+  /** ********* Plugins ***********/
+  val AdditionalJarsProp = "plugin.additional.jars"
+  val PluginLocatorClassnameProp = "plugin.locator.classname"
+  val PluginConfigurationFileProp = "plugin.configuration.file"
+
   /* Documentation */
   /** ********* Zookeeper Configuration ***********/
   val ZkConnectDoc = "Zookeeper host string"
@@ -506,6 +514,11 @@ object KafkaConfig {
   val SaslKerberosMinTimeBeforeReloginDoc = SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC
   val SaslKerberosPrincipalToLocalRulesDoc = SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC
 
+  /** ********* Plugins ***********/
+  val AdditionalJarsDoc = "Additional jars that contain consensus and storage interfaces. The PluginLocator class will be searched in these jars"
+  val PluginLocatorClassnameDoc = "The fully qualified class name of the PluginLocator implementation"
+  val PluginConfigurationFileDoc = "The path to plugins configuration file. The file structure and content is defined by the plugins implementations"
+
   private val configDef = {
     import ConfigDef.Importance._
     import ConfigDef.Range._
@@ -670,6 +683,11 @@ object KafkaConfig {
       .define(SaslKerberosMinTimeBeforeReloginProp, LONG, Defaults.SaslKerberosMinTimeBeforeRelogin, MEDIUM, SaslKerberosMinTimeBeforeReloginDoc)
       .define(SaslKerberosPrincipalToLocalRulesProp, LIST, Defaults.SaslKerberosPrincipalToLocalRules, MEDIUM, SaslKerberosPrincipalToLocalRulesDoc)
 
+      /** ********* Plugins ***********/
+      .define(AdditionalJarsProp, LIST, Defaults.AdditionalJars, HIGH, AdditionalJarsDoc)
+      .define(PluginLocatorClassnameProp, STRING, HIGH, PluginLocatorClassnameDoc)
+      .define(PluginConfigurationFileProp, STRING, HIGH, PluginConfigurationFileDoc)
+
   }
 
   def configNames() = {
@@ -848,6 +866,11 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
   val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp)
   val compressionType = getString(KafkaConfig.CompressionTypeProp)
 
+  val additionalJars = getList(KafkaConfig.AdditionalJarsProp)
+  val pluginLocatorClassname = getString(KafkaConfig.PluginLocatorClassnameProp)
+  val pluginConfigurationFile = getString(KafkaConfig.PluginConfigurationFileProp)
+
+
   val listeners = getListeners
   val advertisedListeners = getAdvertisedListeners
 
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 9eedbe2..0c99d27 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -29,6 +29,7 @@ import java.util.concurrent._
 import atomic.{AtomicInteger, AtomicBoolean}
 import java.io.{IOException, File}
 
+import kafka.plugin.PluginsClassLoaderHelper
 import kafka.security.auth.Authorizer
 import kafka.utils._
 import org.apache.kafka.clients.{ManualMetadataUpdater, ClientRequest, NetworkClient}
@@ -51,6 +52,7 @@ import kafka.network.{BlockingChannel, SocketServer}
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import kafka.coordinator.{GroupConfig, GroupCoordinator}
+import scala.collection.JavaConverters._
 
 object KafkaServer {
   // Copy the subset of properties that are relevant to Logs
@@ -157,6 +159,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
 
       val canStartup = isStartingUp.compareAndSet(false, true)
       if (canStartup) {
+
+        val clhelper = new PluginsClassLoaderHelper(config.additionalJars.asScala, config.pluginLocatorClassname)
+        val pluginLocator = clhelper.loadPluginLocator()
+        pluginLocator.startup(config.pluginConfigurationFile)
+
         metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)
 
         brokerState.newState(Starting)
@@ -184,7 +191,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
         replicaManager.startup()
 
         /* start kafka controller */
-        kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
+        kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix, pluginLocator.getLeaderElection)
         kafkaController.startup()
 
         /* start kafka coordinator */
diff --git a/core/src/main/scala/kafka/server/LeaderElector.scala b/core/src/main/scala/kafka/server/LeaderElector.scala
index 14b3fa4..4a8271b 100644
--- a/core/src/main/scala/kafka/server/LeaderElector.scala
+++ b/core/src/main/scala/kafka/server/LeaderElector.scala
@@ -29,5 +29,7 @@ trait LeaderElector extends Logging {
 
   def elect: Boolean
 
+  def resign: Unit
+
   def close
 }
diff --git a/core/src/main/scala/kafka/server/LeaderElectorAdapter.scala b/core/src/main/scala/kafka/server/LeaderElectorAdapter.scala
new file mode 100644
index 0000000..7170d95
--- /dev/null
+++ b/core/src/main/scala/kafka/server/LeaderElectorAdapter.scala
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.utils.{Json, SystemTime}
+import org.apache.kafka.plugin.interface.{LeaderChangeListener, LeaderElection}
+
+class LeaderElectorAdapter(onBecomingLeader: () => Unit,
+                           onResigningAsLeader: () => Unit,
+                           brokerId: Int,
+                           leaderElector: LeaderElection) extends LeaderElector {
+
+  val candidate = brokerId.toString
+
+  @volatile var lastKnownLeader: Option[String] = None
+
+  override def startup: Unit = {
+    leaderElector.addListener(new LeaderChangeListener {
+      override def onLeaderChange(leaderOpt: Option[String]): Unit = {
+        logger.info(s"Leader change event. Last known leader - $lastKnownLeader, new leader - $leaderOpt")
+        val shallBecomeLeader = leaderOpt.exists(_ == candidate)
+        val wasILeader = amILeader
+        lastKnownLeader = leaderOpt
+
+        if (wasILeader) {
+          if (!shallBecomeLeader) {
+            // was a leader but now is not - onResign
+            logger.info(s"Starting resigning as leader since this broker was leader and received update that new leader is $leaderOpt")
+            onResigningAsLeader()
+          } else {
+            logger.info("This broker considered itself a controller but received notification about becoming a controller. It is possible this " +
+              "broker has missed some important updates and is in inconsistent state now. Resigning as a leader.")
+            resign
+          }
+        } else {
+          if (shallBecomeLeader) {
+            // wasn't a leader but became it
+            logger.info("Starting procedures on becoming leader since this broker was not a leader and received notification that now it is")
+            onBecomingLeader()
+          } else {
+            // wasn't a leader a somebody else resigned or acquired leadership - do nothing
+          }
+        }
+      }
+    })
+    elect
+  }
+
+  override def elect: Boolean = {
+    val timestamp = SystemTime.milliseconds.toString
+    val electString = Json.encode(Map("version" -> 1, "brokerid" -> candidate, "timestamp" -> timestamp))
+
+    leaderElector.nominate(brokerId.toString, electString)
+
+    amILeader
+  }
+
+  override def close: Unit = {
+    leaderElector.close()
+  }
+
+  override def resign: Unit = {
+    leaderElector.resign(candidate)
+  }
+
+  override def amILeader: Boolean = {
+    //    leaderElector.getLeader.exists {
+    //      case (leader, _) => leader == candidate
+    //    }
+
+    lastKnownLeader.exists(_ == candidate)
+  }
+}
diff --git a/plugin-interface/src/main/scala/org/apache/kafka/plugin/interface/LeaderElection.scala b/plugin-interface/src/main/scala/org/apache/kafka/plugin/interface/LeaderElection.scala
new file mode 100644
index 0000000..2ae7749
--- /dev/null
+++ b/plugin-interface/src/main/scala/org/apache/kafka/plugin/interface/LeaderElection.scala
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.plugin.interface
+
+/**
+ * A connector for leadership election protocol. Supports two modes:
+ * 1) "running for election" (joining the candidates for leadership, resigning as a leader, subscribing to change notifications)
+ * 2) "observing" (getting current leader, subscribing to change notifications)
+ *
+ */
+trait LeaderElection {
+  /**
+   * Each instance of this class is tightly coupled with leadership over exactly one service (resource),
+   * once set (during initialization) cannot be changed
+   *
+   * @return unique group identifier among all application services (resources)
+   */
+  def service: String
+
+  /**
+   * Get current leader of the resource (if any)
+   * @return the leader id if it exists
+   */
+  def getLeader: Option[(String, String)]
+
+  /**
+   * Make this candidate eligible for leader election and try to obtain leadership for this service if it's vacant
+   *
+   * @param candidate ID of the candidate which is eligible for election
+   */
+  def nominate(candidate: String, supData: String): Unit
+
+  /**
+   * Voluntarily resign as a leader and initiate new leader election.
+   * It's a client responsibility to stop all leader duties before calling this method to avoid more-than-one-leader cases
+   *
+   * @param leader current leader ID (will be ignored if not a leader)
+   */
+  def resign(leader: String): Unit
+
+  /**
+   * Register permanent on leader change listener
+   * There is no guarantee listener will be fired on ALL events (due to session reconnects etc)
+   * @param listener see [[LeaderChangeListener]]
+   */
+  def addListener(listener: LeaderChangeListener)
+
+  /**
+   * Deregister on leader change listener
+   * @param listener see [[LeaderChangeListener]]
+   */
+  def removeListener(listener: LeaderChangeListener)
+
+  /**
+   * Setup everything needed for concrete implementation
+   * @param context TBD. Should be abstract enough to be used by different implementations and
+   *                at the same time specific because will be uniformly called from the Kafka code -
+   *                regardless of the implementation
+   */
+  def init(context: Any): Unit
+
+  /**
+   * Release all acquired resources
+   */
+  def close(): Unit
+}
+
+/**
+ * A callback fired on leader change event
+ */
+trait LeaderChangeListener {
+  /**
+   * Event fired when the leader has changed (resigned or acquired a leadership)
+   * @param leader new leader for the given service if one has been elected, otherwise None
+   */
+  def onLeaderChange(leader: Option[String])
+}
diff --git a/plugin-interface/src/main/scala/org/apache/kafka/plugin/interface/ListenerRegistry.scala b/plugin-interface/src/main/scala/org/apache/kafka/plugin/interface/ListenerRegistry.scala
new file mode 100644
index 0000000..65381ce
--- /dev/null
+++ b/plugin-interface/src/main/scala/org/apache/kafka/plugin/interface/ListenerRegistry.scala
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.plugin.interface
+
+/**
+ * A registry for async data change notifications
+ */
+trait ListenerRegistry {
+  /**
+   * Register permanent callback for data change event
+   * @param key the listenable data identifier
+   * @param eventListener see [[ValueChangeListener]]
+   */
+  def addValueChangeListener(key: String, eventListener: ValueChangeListener): Unit
+
+  /**
+   * Deregister permanent callback for data change event
+   * @param key the listenable data identifier
+   * @param eventListener
+   */
+  def removeValueChangeListener(key: String, eventListener: ValueChangeListener): Unit
+
+  /**
+   * Register permanent callback for key-set change event
+   * @param namespace the listenable key-set identifier (e.g. parent path in Zookeeper, table name in Database etc)
+   * @param eventListener see [[ValueChangeListener]]
+   */
+  def addKeySetChangeListener(namespace: String, eventListener: KeySetChangeListener): Unit
+
+  /**
+   * Deregister permanent callback for key-set change event
+   * @param namespace the listenable key-set identifier (e.g. parent path in Zookeeper, table name in Database etc)
+   * @param eventListener see [[ValueChangeListener]]
+   */
+  def removeKeySetChangeListener(namespace: String, eventListener: KeySetChangeListener): Unit
+
+  /**
+   * Setup everything needed for concrete implementation
+   * @param context TBD. Should be abstract enough to be used by different implementations and
+   *                at the same time specific because will be uniformly called from the Kafka code,
+   *                regardless of the implementation
+   */
+  def init(context: Any): Unit
+
+  /**
+   * Release all acquired resources
+   */
+  def close(): Unit
+}
+
+/**
+ * Callback on value change event
+ */
+trait ValueChangeListener {
+  def valueChanged(newValue: Option[String])
+}
+
+/**
+ * Callback on key-set change event
+ */
+trait KeySetChangeListener {
+  def keySetChanged(newKeySet: Set[String])
+}
diff --git a/plugin-interface/src/main/scala/org/apache/kafka/plugin/interface/PluginLocator.scala b/plugin-interface/src/main/scala/org/apache/kafka/plugin/interface/PluginLocator.scala
new file mode 100644
index 0000000..95ad805
--- /dev/null
+++ b/plugin-interface/src/main/scala/org/apache/kafka/plugin/interface/PluginLocator.scala
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.plugin.interface
+
+trait PluginLocator {
+
+  def startup(configFile: String): Unit
+
+  def getLeaderElection: LeaderElection
+
+  def getListenerRegistry: ListenerRegistry
+
+  //def getGroupMembership: GroupMembership
+
+  //def getStorage: Storage
+
+}
diff --git a/settings.gradle b/settings.gradle
index 3d69fac..0b7724e 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -15,4 +15,4 @@
 
 apply from: file('scala.gradle')
 include 'core', 'examples', 'clients', 'tools', 'streams', 'log4j-appender',
-        'connect:api', 'connect:runtime', 'connect:json', 'connect:file'
+        'connect:api', 'connect:runtime', 'connect:json', 'connect:file', 'plugin-interface'
-- 
1.9.1

