diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java index 6e6f5b9..e5c4a00 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java @@ -27,5 +27,11 @@ InputFormat sourceInputFormat, Deserializer serde); void close(); String getMemoryInfo(); + + /** + * purge is best effort and will just release the buffers that are unlocked (refCount == 0). This is typically + * called when the system is idle. + */ + long purge(); void initCacheOnlyInputFormat(InputFormat inputFormat); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java index 4fbaac1..6a361fa 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java @@ -183,6 +183,11 @@ public void setParentDebugDumper(LlapOomDebugDump dumper) { realPolicy.setParentDebugDumper(dumper); } + @Override + public long purge() { + return realPolicy.purge(); + } + @Override public long evictSomeBlocks(long memoryToReserve) { diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java index 2cd70b9..3323636 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java @@ -27,4 +27,5 @@ long evictSomeBlocks(long memoryToReserve); void setEvictionListener(EvictionListener listener); void setParentDebugDumper(LlapOomDebugDump dumper); + long purge(); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java index 50a2411..f7f80a8 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java @@ -24,6 +24,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; @@ -70,6 +71,13 @@ public void setParentDebugDumper(LlapOomDebugDump dumper) { } @Override + public long purge() { + long evicted = evictSomeBlocks(Long.MAX_VALUE); + LlapIoImpl.LOG.info("PURGE: evicted {} from FIFO policy", LlapUtil.humanReadableByteCount(evicted)); + return evicted; + } + + @Override public long evictSomeBlocks(long memoryToReserve) { return evictInternal(memoryToReserve, -1); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java index b42f761..7787cb4 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java @@ -24,6 +24,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; @@ -177,6 +178,13 @@ public void setParentDebugDumper(LlapOomDebugDump dumper) { this.parentDebugDump = dumper; } + @Override + public long purge() { + long evicted = evictSomeBlocks(Long.MAX_VALUE); + LlapIoImpl.LOG.info("PURGE: evicted {} from LRFU policy", LlapUtil.humanReadableByteCount(evicted)); + return evicted; + } + @Override public long evictSomeBlocks(long memoryToReserve) { diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusOptionsProcessor.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusOptionsProcessor.java index e88c819..7c72e8b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusOptionsProcessor.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusOptionsProcessor.java @@ -61,6 +61,7 @@ WATCH_MODE_TIMEOUT("watchTimeout", 't', "Exit watch mode if the desired state is not attained until the specified" + " timeout. (Default " + TimeUnit.SECONDS.convert(DEFAULT_WATCH_MODE_TIMEOUT_MS, TimeUnit.MILLISECONDS) +"s)", true), HIVECONF("hiveconf", null, "Use value for given property. Overridden by explicit parameters", "property=value", 2), + PURGE_CACHE("purgeCache", 'p', "Purge LLAP IO Cache", false), HELP("help", 'H', "Print help information", false); @@ -114,16 +115,18 @@ public int getNumArgs() { private final long watchTimeout; private final float runningNodesThreshold; private final boolean isLaunched; + private final boolean purgeCache; public LlapStatusOptions(final String name) { this(name, new Properties(), FIND_YARN_APP_TIMEOUT_MS, null, DEFAULT_STATUS_REFRESH_INTERVAL_MS, false, - DEFAULT_WATCH_MODE_TIMEOUT_MS, DEFAULT_RUNNING_NODES_THRESHOLD, true); + DEFAULT_WATCH_MODE_TIMEOUT_MS, DEFAULT_RUNNING_NODES_THRESHOLD, true, false); } public LlapStatusOptions(String name, Properties hiveProperties, long findAppTimeoutMs, String outputFile, long refreshIntervalMs, final boolean watchMode, final long watchTimeoutMs, - final float runningNodesThreshold, final boolean isLaunched) { + final float runningNodesThreshold, final boolean isLaunched, + final boolean purgeCache) { this.name = name; this.conf = hiveProperties; this.findAppTimeoutMs = findAppTimeoutMs; @@ -133,6 +136,7 @@ public LlapStatusOptions(String name, Properties hiveProperties, long findAppTim this.watchTimeout = watchTimeoutMs; this.runningNodesThreshold = runningNodesThreshold; this.isLaunched = isLaunched; + this.purgeCache = purgeCache; } public String getName() { @@ -170,6 +174,10 @@ public long getWatchTimeoutMs() { public float getRunningNodesThreshold() { return runningNodesThreshold; } + + public boolean isPurgeCache() { + return purgeCache; + } } private final Options options = new Options(); @@ -250,8 +258,10 @@ public LlapStatusOptions processOptions(String[] args) throws ParseException { "Running nodes threshold value should be between 0.0 and 1.0 (inclusive)"); } } + + boolean isPurgeCache = commandLine.hasOption(OptionConstants.PURGE_CACHE.getLongOpt()); return new LlapStatusOptions(name, hiveConf, findAppTimeoutMs, outputFile, refreshIntervalMs, - watchMode, watchTimeoutMs, runningNodesThreshold, isLaunched); + watchMode, watchTimeoutMs, runningNodesThreshold, isLaunched, isPurgeCache); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java index 65b4d81..4e10465 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java @@ -27,24 +27,34 @@ import java.io.PrintWriter; import java.net.URISyntaxException; import java.text.DecimalFormat; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; + +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.methods.DeleteMethod; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.cli.LlapStatusOptionsProcessor.LlapStatusOptions; import org.apache.hadoop.hive.llap.cli.status.LlapStatusHelpers; import org.apache.hadoop.hive.llap.cli.status.LlapStatusHelpers.AppStatusBuilder; import org.apache.hadoop.hive.llap.cli.status.LlapStatusHelpers.LlapInstance; import org.apache.hadoop.hive.llap.cli.status.LlapStatusHelpers.State; import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration; +import org.apache.hadoop.hive.llap.daemon.services.impl.LlapIoMemoryServlet; import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.ql.session.SessionState; @@ -800,6 +810,7 @@ private static String constructDiagnostics( SLIDER_CLIENT_ERROR_OTHER(31), LLAP_REGISTRY_ERROR(40), LLAP_JSON_GENERATION_ERROR(50), + LLAP_PURGE_CACHE_ERROR(60), // Error in the script itself - likely caused by an incompatible change, or new functionality / states added. INTERNAL_ERROR(100); @@ -845,7 +856,6 @@ public static void main(String[] args) { LOG.info("LLAP status invoked with arguments = {}", Arrays.toString(args)); int ret = ExitCode.SUCCESS.getInt(); Clock clock = new SystemClock(); - long startTime = clock.getTime(); long lastSummaryLogTime = -1; LlapStatusServiceDriver statusServiceDriver = null; @@ -853,6 +863,17 @@ public static void main(String[] args) { try { statusServiceDriver = new LlapStatusServiceDriver(); options = statusServiceDriver.parseOptions(args); + boolean purgeCache = options.isPurgeCache(); + if (purgeCache) { + try { + ret = statusServiceDriver.handlePurgeCache(); + } catch (LlapStatusCliException e) { + statusServiceDriver.close(); + logError(e); + ret = ExitCode.INTERNAL_ERROR.getInt(); + } + System.exit(ret); + } } catch (Throwable t) { statusServiceDriver.close(); logError(t); @@ -1031,6 +1052,77 @@ public static void main(String[] args) { System.exit(ret); } + private int handlePurgeCache() throws LlapStatusCliException { + if (llapRegistry == null) { + try { + llapRegistry = LlapRegistryService.getClient(llapRegistryConf); + } catch (Exception e) { + throw new LlapStatusCliException(ExitCode.LLAP_REGISTRY_ERROR, + "Failed to create llap registry client", e); + } + } + List> futures = new ArrayList<>(); + int totalInstances = 0; + try { + ExecutorService executorService = Executors.newCachedThreadPool(); + Collection instances = llapRegistry.getInstances().getAll(); + totalInstances = instances.size(); + for (LlapServiceInstance llapServiceInstance : instances) { + Future future = executorService.submit(new HttpPurgeCallable(llapServiceInstance)); + futures.add(future); + } + } catch (Exception e) { + throw new LlapStatusCliException(ExitCode.LLAP_REGISTRY_ERROR, + "Failed to get instances using llap registry client", e); + } + long totalPurged = 0; + try { + for (Future future : futures) { + Long purgeMem = future.get(); + if (purgeMem != null) { + totalPurged += purgeMem; + } + } + + System.out.println("Successfully purged a total of " + LlapUtil.humanReadableByteCount(totalPurged) + " LLAP IO" + + " cache memory from " + totalInstances + " instances"); + } catch (Exception e) { + throw new LlapStatusCliException(ExitCode.LLAP_PURGE_CACHE_ERROR, + "Failed to purge cache", e); + } + return 0; + } + + private class HttpPurgeCallable implements Callable { + private LlapServiceInstance llapServiceInstance; + + public HttpPurgeCallable(LlapServiceInstance llapServiceInstance) { + this.llapServiceInstance = llapServiceInstance; + } + + @Override + public Long call() throws Exception { + String url = llapServiceInstance.getServicesAddress() + "/iomem"; + HttpClient httpClient = new HttpClient(); + DeleteMethod deleteMethod = new DeleteMethod(url); + try { + int statusCode = httpClient.executeMethod(deleteMethod); + if (statusCode == 200) { + ObjectMapper objectMapper = new ObjectMapper(); + LlapIoMemoryServlet.PurgeResponse purgeResponse = objectMapper.readValue(deleteMethod + .getResponseBodyAsString(), LlapIoMemoryServlet.PurgeResponse.class); + return purgeResponse.getEvictedMemory(); + } else { + LOG.error("Status code {} returned. {}", deleteMethod.getStatusLine().getReasonPhrase()); + System.out.println(deleteMethod.getStatusLine().getReasonPhrase()); + return null; + } + } finally { + deleteMethod.releaseConnection(); + } + } + } + private static long maybeLogSummary(Clock clock, long lastSummaryLogTime, LlapStatusServiceDriver statusServiceDriver, boolean watchMode, long watchTimeout, LlapStatusHelpers.State launchingState) { diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapIoMemoryServlet.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapIoMemoryServlet.java index 153ab35..3313e46 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapIoMemoryServlet.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapIoMemoryServlet.java @@ -17,40 +17,40 @@ package org.apache.hadoop.hive.llap.daemon.services.impl; +import java.io.IOException; import java.io.PrintWriter; -import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.io.api.LlapIo; import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hive.http.HttpServer; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; -@SuppressWarnings("serial") public class LlapIoMemoryServlet extends HttpServlet { private static final Log LOG = LogFactory.getLog(LlapIoMemoryServlet.class); - static final String ACCESS_CONTROL_ALLOW_METHODS = "Access-Control-Allow-Methods"; - static final String ACCESS_CONTROL_ALLOW_ORIGIN = "Access-Control-Allow-Origin"; + private static final String ACCESS_CONTROL_ALLOW_METHODS = "Access-Control-Allow-Methods"; + private static final String ACCESS_CONTROL_ALLOW_ORIGIN = "Access-Control-Allow-Origin"; /** * Initialize this servlet. */ @Override - public void init() throws ServletException { + public void init() { } /** * Process a GET request for the specified resource. * - * @param request - * The servlet request we are processing - * @param response - * The servlet response we are creating + * @param request The servlet request we are processing + * @param response The servlet response we are creating */ @Override public void doGet(HttpServletRequest request, HttpServletResponse response) { @@ -59,10 +59,10 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) { return; } PrintWriter writer = null; - + try { response.setContentType("text/plain; charset=utf8"); - response.setHeader(ACCESS_CONTROL_ALLOW_METHODS, "GET"); + response.setHeader(ACCESS_CONTROL_ALLOW_METHODS, "GET,DELETE"); response.setHeader(ACCESS_CONTROL_ALLOW_ORIGIN, "*"); response.setHeader("Cache-Control", "no-transform,public,max-age=60,s-maxage=60"); @@ -85,4 +85,97 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) { response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); } } + + public static class PurgeResponse { + private boolean success = false; + private String message = ""; + private long evictedMemory = 0; + + public PurgeResponse() { + } + + public PurgeResponse(final boolean success, final String message) { + this.success = success; + this.message = message; + } + + public boolean isSuccess() { + return success; + } + + public void setSuccess(final boolean success) { + this.success = success; + } + + public String getMessage() { + return message; + } + + public void setMessage(final String message) { + this.message = message; + } + + public long getEvictedMemory() { + return evictedMemory; + } + + public void setEvictedMemory(final long evictedMemory) { + this.evictedMemory = evictedMemory; + } + } + + /** + * DELETE method is used to purge the LLAP IO cache + * + * @param request The servlet request we are processing + * @param response The servlet response we are creating + */ + @Override + public void doDelete(HttpServletRequest request, HttpServletResponse response) { + PurgeResponse purgeResponse = new PurgeResponse(); + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false); + PrintWriter writer = null; + try { + if (!HttpServer.isInstrumentationAccessAllowed(getServletContext(), request, response)) { + return; + } + try { + response.setContentType("application/json; charset=utf8"); + response.setHeader(ACCESS_CONTROL_ALLOW_METHODS, "GET,DELETE"); + response.setHeader(ACCESS_CONTROL_ALLOW_ORIGIN, "*"); + + writer = response.getWriter(); + + LlapIo llapIo = LlapProxy.getIo(); + if (llapIo == null) { + purgeResponse.setSuccess(false); + purgeResponse.setMessage("LLAP IO not found"); + objectMapper.writerWithDefaultPrettyPrinter().writeValue(writer, purgeResponse); + } else { + long purgeMemory = llapIo.purge(); + purgeResponse.setSuccess(true); + purgeResponse.setEvictedMemory(purgeMemory); + purgeResponse.setMessage("Successfully purged LLAP IO cache memory!"); + } + objectMapper.writerWithDefaultPrettyPrinter().writeValue(writer, purgeResponse); + } finally { + if (writer != null) { + writer.close(); + } + } + } catch (Exception e) { + if (writer != null) { + purgeResponse.setSuccess(false); + purgeResponse.setMessage("Error while purging LLAP IO cache!"); + try { + objectMapper.writerWithDefaultPrettyPrinter().writeValue(writer, purgeResponse); + } catch (IOException e1) { + // best effort, ignore + } + } + LOG.error("Caught exception while processing llap status request", e); + response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + } + } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index e5bc3c2..109a046 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -86,7 +86,6 @@ public static final Logger ORC_LOGGER = LoggerFactory.getLogger("LlapIoOrc"); public static final Logger CACHE_LOGGER = LoggerFactory.getLogger("LlapIoCache"); public static final Logger LOCKING_LOGGER = LoggerFactory.getLogger("LlapIoLocking"); - private static final String MODE_CACHE = "cache"; // TODO: later, we may have a map @@ -101,6 +100,7 @@ private final LowLevelCache dataCache; private final BufferUsageManager bufferManager; private final Configuration daemonConf; + private LowLevelCachePolicy cachePolicy; private LlapIoImpl(Configuration conf) throws IOException { this.daemonConf = conf; @@ -139,7 +139,7 @@ private LlapIoImpl(Configuration conf) throws IOException { boolean useLrfu = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU); long totalMemorySize = HiveConf.getSizeVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE); int minAllocSize = (int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC); - LowLevelCachePolicy cachePolicy = useLrfu ? new LowLevelLrfuCachePolicy( + this.cachePolicy = useLrfu ? new LowLevelLrfuCachePolicy( minAllocSize, totalMemorySize, conf) : new LowLevelFifoCachePolicy(); boolean trackUsage = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE); if (trackUsage) { @@ -212,6 +212,14 @@ public String getMemoryInfo() { } @Override + public long purge() { + if (cachePolicy != null) { + return cachePolicy.purge(); + } + return 0; + } + + @Override public InputFormat getInputFormat( InputFormat sourceInputFormat, Deserializer sourceSerDe) { ColumnVectorProducer cvp = genericCvp; diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java index 2c87bc2..b19cdcf 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java @@ -120,6 +120,11 @@ public void setParentDebugDumper(LlapOomDebugDump dumper) { } @Override + public long purge() { + return 0; + } + + @Override public void debugDumpShort(StringBuilder sb) { } } diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java index f7ebff2..58c918c 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java @@ -63,6 +63,11 @@ public String debugDumpForOom() { public void setParentDebugDumper(LlapOomDebugDump dumper) { } + @Override + public long purge() { + return 0; + } + public void verifyEquals(int i) { assertEquals(i, lockCount); assertEquals(i, unlockCount);