diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index 15a81da..eb93241 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -29,7 +29,9 @@ import java.sql.ResultSet; import java.sql.Statement; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.Random; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; @@ -94,6 +96,7 @@ private String user; // "hive", private String pwd; // "" private String query; + private final Random rand = new Random(); public static final String URL_KEY = "llap.if.hs2.connection"; public static final String QUERY_KEY = "llap.if.query"; @@ -101,6 +104,7 @@ public static final String PWD_KEY = "llap.if.pwd"; public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)"; + public static final ServiceInstance[] serviceInstanceArray = new ServiceInstance[0]; public LlapBaseInputFormat(String url, String user, String pwd, String query) { this.url = url; @@ -234,7 +238,11 @@ private ServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapSplit ServiceInstance serviceInstance = getServiceInstanceForHost(registryService, host); if (serviceInstance == null) { - throw new IOException("No service instances found for " + host + " in registry"); + LOG.info("No service instances found for " + host + " in registry."); + serviceInstance = getServiceInstanceRandom(registryService); + if (serviceInstance == null) { + throw new IOException("No service instances found in registry"); + } } return serviceInstance; @@ -272,6 +280,20 @@ private ServiceInstance getServiceInstanceForHost(LlapRegistryService registrySe return serviceInstance; } + + private ServiceInstance getServiceInstanceRandom(LlapRegistryService registryService) throws IOException { + ServiceInstanceSet instanceSet = registryService.getInstances(); + ServiceInstance serviceInstance = null; + + LOG.info("Finding random live service instance"); + Collection allInstances = instanceSet.getAll(); + if (allInstances.size() > 0) { + int randIdx = rand.nextInt() % allInstances.size(); + serviceInstance = allInstances.toArray(serviceInstanceArray)[randIdx]; + } + return serviceInstance; + } + private ServiceInstance selectServiceInstance(Set serviceInstances) { if (serviceInstances == null || serviceInstances.isEmpty()) { return null;