Index: ql/src/java/org/apache/hadoop/hive/ql/processors/CompileProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/processors/CompileProcessor.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/processors/CompileProcessor.java (working copy) @@ -0,0 +1,255 @@ +package org.apache.hadoop.hive.ql.processors; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.StringTokenizer; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.compress.archivers.jar.JarArchiveEntry; +import org.apache.commons.compress.archivers.jar.JarArchiveOutputStream; +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.VariableSubstitution; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.session.SessionState.ResourceType; +import org.apache.tools.ant.Project; +import org.apache.tools.ant.types.Path; +import org.codehaus.groovy.ant.Groovyc; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.io.Files; + +/** + * Processor allows users to build Java code on-the-fly inside a hive session, + * and then use this code as a UDF, Serde, or even a more complex entity like an + * input format or hook. + * + * Note: This class is stateful and not thread safe. Create a new instance for each invocation + * CompileProcessor. + * + */ +public class CompileProcessor implements CommandProcessor { + + public static final Log LOG = LogFactory.getLog(CompileProcessor.class.getName()); + public static final LogHelper console = new LogHelper(LOG); + + private String lang; + private String code; + private String named; + private String command; + private int myId; + + private static final String SYNTAX = "syntax: COMPILE ` some code here ` AS groovy NAMED something.groovy"; + private static final AtomicInteger runCount; + public static final String IO_TMP_DIR = "java.io.tmpdir"; + public static final String GROOVY = "GROOVY"; + public static final String AS = "AS"; + public static final String NAMED = "NAMED"; + + static { + runCount = new AtomicInteger(0); + } + + @Override + public void init() { + + } + + /** + * User supplies dynamic code in this format: + * compile ` some code here ` AS groovy named something.groovy; + * Hive will compile and package this code into a jar. The jar + * will be added to the session state via the session states + * add resource command. + * @param command a String to be compiled + * @return CommandProcessorResponse with 0 for success and 1 for failure + * + */ + @Override + public CommandProcessorResponse run(String command) throws CommandNeedRetryException { + SessionState ss = SessionState.get(); + myId = runCount.getAndIncrement(); + this.command = command; + try { + parse(ss); + } catch (CompileProcessorException e) { + return new CommandProcessorResponse(1, e.getMessage(), null); + } + CommandProcessorResponse result = null; + try { + result = compile(ss); + } catch (CompileProcessorException e) { + result = new CommandProcessorResponse(1, e.getMessage(), null); + } + return result; + } + + /** + * Parses the supplied command + * @param ss + * @throws CompileProcessorException if the code can not be compiled or the jar can not be made + */ + @VisibleForTesting + void parse(SessionState ss) throws CompileProcessorException { + if (ss != null){ + command = new VariableSubstitution().substitute(ss.getConf(), command); + } + if (command == null || command.length() == 0) { + throw new CompileProcessorException("Command was empty"); + } + StringBuilder toCompile = new StringBuilder(); + int startPosition = 0; + int endPosition = -1; + //TODO Escape handling will be changed in follow on + while (command.charAt(startPosition++) != '`' && startPosition< command.length()){ + + } + for (int i = startPosition; i < command.length(); i++) { + if (command.charAt(i) == '\\') { + toCompile.append(command.charAt(i + 1)); + i = i + 1; + continue; + } else if (command.charAt(i) == '`'){ + endPosition = i; + break; + } else { + toCompile.append(command.charAt(i)); + } + } + StringTokenizer st = new StringTokenizer(command.substring(endPosition+1), " "); + if (st.countTokens() != 4){ + throw new CompileProcessorException(SYNTAX); + } + String shouldBeAs = st.nextToken(); + if (!shouldBeAs.equalsIgnoreCase(AS)){ + throw new CompileProcessorException(SYNTAX); + } + setLang(st.nextToken()); + if (!lang.equalsIgnoreCase(GROOVY)){ + throw new CompileProcessorException("Can not compile "+lang+ ". Hive can only compile "+GROOVY); + } + String shouldBeNamed = st.nextToken(); + if (!shouldBeNamed.equalsIgnoreCase(NAMED)){ + throw new CompileProcessorException(SYNTAX); + } + setNamed(st.nextToken()); + setCode(toCompile.toString()); + } + + @VisibleForTesting + /** + * Method converts statement into a file, compiles the file and then packages the file. + * @param ss + * @return Response code of 0 for success 1 for failure + * @throws CompileProcessorException + */ + CommandProcessorResponse compile(SessionState ss) throws CompileProcessorException { + Project proj = new Project(); + String ioTempDir = System.getProperty(IO_TMP_DIR); + File ioTempFile = new File(ioTempDir); + if (!ioTempFile.exists()){ + throw new CompileProcessorException(ioTempDir +" does not exists"); + } + if (!ioTempFile.isDirectory() || !ioTempFile.canWrite()){ + throw new CompileProcessorException(ioTempDir +" is not a writable directory"); + } + Groovyc g = new Groovyc(); + long runStamp = System.currentTimeMillis(); + String jarId = myId + "_" + runStamp; + g.setProject(proj); + Path sourcePath = new Path(proj); + File destination = new File(ioTempFile, jarId+"out"); + g.setDestdir(destination); + File input = new File(ioTempFile, jarId + "in"); + sourcePath.setLocation(input); + g.setSrcdir(sourcePath); + input.mkdir(); + + File fileToWrite = new File(input, this.named); + try { + Files.write(this.code, fileToWrite, Charset.forName("UTF-8")); + } catch (IOException e1) { + throw new CompileProcessorException("writing file", e1); + } + destination.mkdir(); + g.execute(); + + File testArchive = new File(ioTempFile, jarId+".jar"); + JarArchiveOutputStream out = null; + try { + out = new JarArchiveOutputStream(new FileOutputStream(testArchive)); + for (File f: destination.listFiles()){ + JarArchiveEntry jentry = new JarArchiveEntry(f.getName()); + FileInputStream fis = new FileInputStream(f); + out.putArchiveEntry(jentry); + IOUtils.copy(fis, out); + fis.close(); + out.closeArchiveEntry(); + } + out.finish(); + } catch (IOException e) { + throw new CompileProcessorException("Exception while writing jar", e); + } finally { + if (out!=null){ + try { + out.close(); + } catch (IOException WhatCanYouDo) { + } + } + } + + if (ss != null){ + ss.add_resource(ResourceType.JAR, testArchive.getAbsolutePath()); + } + CommandProcessorResponse good = new CommandProcessorResponse(0, testArchive.getAbsolutePath(), null); + return good; + } + + public String getLang() { + return lang; + } + + public void setLang(String lang) { + this.lang = lang; + } + + public String getCode() { + return code; + } + + public void setCode(String code) { + this.code = code; + } + + public String getNamed() { + return named; + } + + public void setNamed(String named) { + this.named = named; + } + + public String getCommand() { + return command; + } + + class CompileProcessorException extends HiveException { + + private static final long serialVersionUID = 1L; + + CompileProcessorException(String s, Throwable t) { + super(s, t); + } + + CompileProcessorException(String s) { + super(s); + } + } +} Index: ivy/libraries.properties =================================================================== --- ivy/libraries.properties (revision 1523187) +++ ivy/libraries.properties (working copy) @@ -42,6 +42,7 @@ commons-pool.version=1.5.4 derby.version=10.4.2.0 guava.version=11.0.2 +groovy.version=2.1.6 hbase.version=0.94.6.1 jackson.version=1.8.8 javaewah.version=0.3.2 Index: ql/ivy.xml =================================================================== --- ql/ivy.xml (revision 1523187) +++ ql/ivy.xml (working copy) @@ -49,6 +49,9 @@ + +