diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 8f7503e..8d41eee 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -36,6 +36,7 @@ import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.rest.RestServer; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; import org.apache.kafka.connect.runtime.rest.entities.TaskInfo; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.storage.ConfigBackingStore; @@ -49,6 +50,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -108,6 +110,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { private final Time time; private final String workerGroupId; + private final Map types = new HashMap<>(); private final int workerSyncTimeoutMs; private final long workerTasksShutdownTimeoutMs; private final int workerUnsyncBackoffMs; @@ -430,7 +433,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable { if (!configState.contains(connName)) { callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); } else { - callback.onCompletion(null, new ConnectorInfo(connName, configState.connectorConfig(connName), configState.tasks(connName))); + callback.onCompletion(null, new ConnectorInfo(connName, configState.connectorConfig(connName), + configState.tasks(connName), types.get(connName))); } return null; } @@ -485,14 +489,16 @@ public class DistributedHerder extends AbstractHerder implements Runnable { ConfigDef configDef, Map config) { Map validatedConfig = super.validateBasicConnectorConfig(connector, configDef, config); + ConfigValue validatedName = validatedConfig.get(ConnectorConfig.NAME_CONFIG); + String name = (String) validatedName.value(); if (connector instanceof SinkConnector) { - ConfigValue validatedName = validatedConfig.get(ConnectorConfig.NAME_CONFIG); - String name = (String) validatedName.value(); - if (workerGroupId.equals(SinkUtils.consumerGroupId(name))) { validatedName.addErrorMessage("Consumer group for sink connector named " + name + " conflicts with Connect worker group " + workerGroupId); } + types.put(name, ConnectorType.SINK); + } else { + types.put(name, ConnectorType.SOURCE); } return validatedConfig; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java index 9179d3b..9a10d74 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java @@ -30,14 +30,17 @@ public class ConnectorInfo { private final String name; private final Map config; private final List tasks; + private final ConnectorType type; @JsonCreator public ConnectorInfo(@JsonProperty("name") String name, @JsonProperty("config") Map config, - @JsonProperty("tasks") List tasks) { + @JsonProperty("tasks") List tasks, + @JsonProperty("type") ConnectorType type) { this.name = name; this.config = config; this.tasks = tasks; + this.type = type; } @@ -47,6 +50,11 @@ public class ConnectorInfo { } @JsonProperty + public ConnectorType type() { + return type; + } + + @JsonProperty public Map config() { return config; }