diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index deb9905..13b5113 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -435,7 +435,6 @@ minitez.query.files=bucket_map_join_tez1.q,\ tez_smb_main.q,\ tez_smb_1.q,\ tez_smb_empty.q,\ - udf_get_splits.q,\ vector_join_part_col_char.q,\ vectorized_dynamic_partition_pruning.q,\ tez_multi_union.q,\ @@ -495,7 +494,8 @@ minillap.query.files=bucket_map_join_tez1.q,\ vectorized_dynamic_partition_pruning.q,\ tez_multi_union.q,\ tez_join.q,\ - tez_union_multiinsert.q + tez_union_multiinsert.q,\ + udtf_get_splits.q encrypted.query.files=encryption_join_unencrypted_tbl.q,\ encryption_insert_partition_static.q,\ diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index 5e81e98..e524bd2 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -1460,7 +1460,8 @@ private void maskPatterns(Pattern[] patterns, String fname) throws Exception { ".*Input:.*/data/files/.*", ".*Output:.*/data/files/.*", ".*total number of created files now is.*", - ".*.hive-staging.*" + ".*.hive-staging.*", + "table_.*" }); private final Pattern[] partialReservedPlanMask = toPattern(new String[] { diff --git jdbc/src/java/org/apache/hive/jdbc/LlapDump.java jdbc/src/java/org/apache/hive/jdbc/LlapDump.java index a807f6c..7ed0a0e 100644 --- jdbc/src/java/org/apache/hive/jdbc/LlapDump.java +++ jdbc/src/java/org/apache/hive/jdbc/LlapDump.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.FileInputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; @@ -61,6 +62,7 @@ private static String user = "hive"; private static String pwd = ""; private static String query = "select * from test"; + private static String numSplits = "1"; public static void main(String[] args) throws Exception { Options opts = createOptions(); @@ -84,6 +86,10 @@ public static void main(String[] args) throws Exception { pwd = cli.getOptionValue("p"); } + if (cli.hasOption('n')) { + numSplits = cli.getOptionValue("n"); + } + if (cli.getArgs().length > 0) { query = cli.getArgs()[0]; } @@ -95,7 +101,7 @@ public static void main(String[] args) throws Exception { LlapInputFormat format = new LlapInputFormat(url, user, pwd, query); JobConf job = new JobConf(); - InputSplit[] splits = format.getSplits(job, 1); + InputSplit[] splits = format.getSplits(job, Integer.parseInt(numSplits)); if (splits.length == 0) { System.out.println("No splits returned - empty scan"); @@ -104,6 +110,7 @@ public static void main(String[] args) throws Exception { boolean first = true; for (InputSplit s: splits) { + LOG.info("Processing input split s from " + Arrays.toString(s.getLocations())); RecordReader reader = format.getRecordReader(s, job, null); if (reader instanceof LlapRecordReader && first) { @@ -122,6 +129,7 @@ public static void main(String[] args) throws Exception { System.out.println(value); } } + System.exit(0); } } @@ -146,6 +154,12 @@ static Options createOptions() { .hasArg() .create('p')); + result.addOption(OptionBuilder + .withLongOpt("num") + .withDescription("number of splits") + .hasArg() + .create('n')); + return result; } } diff --git jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java index 5af2175..9a7c16d 100644 --- jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java +++ jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java @@ -64,6 +64,8 @@ public final String USER_KEY = "llap.if.user"; public final String PWD_KEY = "llap.if.pwd"; + public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)"; + private Connection con; private Statement stmt; @@ -105,7 +107,7 @@ public LlapInputFormat() {} try { con = DriverManager.getConnection(url,user,pwd); stmt = con.createStatement(); - String sql = "select r.if_class as ic, r.split_class as sc, r.split as s from (select explode(get_splits(\""+query+"\","+numSplits+")) as r) t"; + String sql = String.format(SPLIT_QUERY, query, numSplits); ResultSet res = stmt.executeQuery(sql); while (res.next()) { // deserialize split diff --git llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java index d8066d5..b32d662 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java @@ -67,7 +67,7 @@ public LlapInputFormat() { */ @Override public RecordReader getRecordReader(InputSplit split, JobConf job, - Reporter reporter) throws IOException { + Reporter reporter) throws IOException { LlapInputSplit llapSplit = (LlapInputSplit) split; SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes(llapSplit.getPlanBytes()); @@ -75,22 +75,15 @@ public LlapInputFormat() { // TODO HACK: Spark is built with Hive-1.2.1, does not have access to HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT int llapSubmitPort = job.getInt("hive.llap.daemon.rpc.port", 15001); - LOG.info("ZZZ: DBG: Starting LlapTaskUmbilicalExternalClient"); - LlapTaskUmbilicalExternalClient llapClient = - new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(), - submitWorkInfo.getToken()); + new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(), + submitWorkInfo.getToken()); llapClient.init(job); llapClient.start(); - LOG.info("ZZZ: DBG: Crated LlapClient"); - // TODO KKK Shutdown the llap client. - SubmitWorkRequestProto submitWorkRequestProto = - constructSubmitWorkRequestProto(submitWorkInfo, llapSplit.getSplitNum(), - llapClient.getAddress(), submitWorkInfo.getToken()); - - LOG.info("ZZZ: DBG: Created submitWorkRequest for: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString()); + constructSubmitWorkRequestProto(submitWorkInfo, llapSplit.getSplitNum(), + llapClient.getAddress(), submitWorkInfo.getToken()); TezEvent tezEvent = new TezEvent(); DataInputBuffer dib = new DataInputBuffer(); @@ -116,7 +109,7 @@ public LlapInputFormat() { socket.getOutputStream().write(0); socket.getOutputStream().flush(); - LOG.debug("Registered id: " + id); + LOG.info("Registered id: " + id); return new LlapRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class); } @@ -127,16 +120,16 @@ public LlapInputFormat() { } private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo, - int taskNum, - InetSocketAddress address, - Token token) throws - IOException { + int taskNum, + InetSocketAddress address, + Token token) throws + IOException { TaskSpec taskSpec = submitWorkInfo.getTaskSpec(); ApplicationId appId = submitWorkInfo.getFakeAppId(); SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder(); // This works, assuming the executor is running within YARN. - LOG.info("DBG: Setting user in submitWorkRequest to: " + + LOG.info("Setting user in submitWorkRequest to: " + System.getenv(ApplicationConstants.Environment.USER.name())); builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name())); builder.setApplicationIdString(appId.toString()); @@ -144,10 +137,9 @@ private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo su builder.setTokenIdentifier(appId.toString()); ContainerId containerId = - ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum); + ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum); builder.setContainerIdString(containerId.toString()); - builder.setAmHost(address.getHostName()); builder.setAmPort(address.getPort()); Credentials taskCredentials = new Credentials(); @@ -155,18 +147,18 @@ private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo su // TODO Figure out where credentials will come from. Normally Hive sets up // URLs on the tez dag, for which Tez acquires credentials. -// taskCredentials.addAll(getContext().getCredentials()); - -// Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() == -// taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId()); -// ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto); -// if (credentialsBinary == null) { -// credentialsBinary = serializeCredentials(getContext().getCredentials()); -// credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate()); -// } else { -// credentialsBinary = credentialsBinary.duplicate(); -// } -// builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary)); + // taskCredentials.addAll(getContext().getCredentials()); + + // Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() == + // taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId()); + // ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto); + // if (credentialsBinary == null) { + // credentialsBinary = serializeCredentials(getContext().getCredentials()); + // credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate()); + // } else { + // credentialsBinary = credentialsBinary.duplicate(); + // } + // builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary)); Credentials credentials = new Credentials(); TokenCache.setSessionToken(token, credentials); ByteBuffer credentialsBinary = serializeCredentials(credentials); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index e80fb15..d47355a 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -265,10 +265,10 @@ public QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto reques List knownFragments = queryTracker .queryComplete(queryIdentifier, request.getDeleteDelay()); - LOG.info("DBG: Pending fragment count for completed query {} = {}", queryIdentifier, + LOG.info("Pending fragment count for completed query {} = {}", queryIdentifier, knownFragments.size()); for (QueryFragmentInfo fragmentInfo : knownFragments) { - LOG.info("DBG: Issuing killFragment for completed query {} {}", queryIdentifier, + LOG.info("Issuing killFragment for completed query {} {}", queryIdentifier, fragmentInfo.getFragmentIdentifierString()); executorService.killFragment(fragmentInfo.getFragmentIdentifierString()); } @@ -277,7 +277,7 @@ public QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto reques @Override public TerminateFragmentResponseProto terminateFragment(TerminateFragmentRequestProto request) { - LOG.info("DBG: Received terminateFragment request for {}", request.getFragmentIdentifierString()); + LOG.info("Received terminateFragment request for {}", request.getFragmentIdentifierString()); executorService.killFragment(request.getFragmentIdentifierString()); return TerminateFragmentResponseProto.getDefaultInstance(); } @@ -356,10 +356,10 @@ public void queryFailed(QueryIdentifier queryIdentifier) { LOG.info("Processing query failed notification for {}", queryIdentifier); List knownFragments = queryTracker.queryComplete(queryIdentifier, -1); - LOG.info("DBG: Pending fragment count for failed query {} = {}", queryIdentifier, + LOG.info("Pending fragment count for failed query {} = {}", queryIdentifier, knownFragments.size()); for (QueryFragmentInfo fragmentInfo : knownFragments) { - LOG.info("DBG: Issuing killFragment for failed query {} {}", queryIdentifier, + LOG.info("Issuing killFragment for failed query {} {}", queryIdentifier, fragmentInfo.getFragmentIdentifierString()); executorService.killFragment(fragmentInfo.getFragmentIdentifierString()); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java index 4305682..16cfd01 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java @@ -88,12 +88,6 @@ public InetSocketAddress getAddress() { public void submitWork(final SubmitWorkRequestProto submitWorkRequestProto, String llapHost, int llapPort, List tezEvents) { Preconditions.checkArgument(submitWorkRequestProto.getUsingTezAm() == false); - - LOG.warn("ZZZ: DBG: " + " Submitting fragment: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString() + " on host: " + llapHost + ", port=" + llapPort); -// LOG.info("ZZZ: DBG: " + " Complete SubmitWorkRequest: " + submitWorkRequestProto); -// submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString() - - LOG.info("ZZZ: DBG: Received {} events for {}", tezEvents.size(), submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString()); // Register the pending events to be sent for this spec. pendingEvents.putIfAbsent(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), tezEvents); @@ -109,7 +103,6 @@ public void setResponse(LlapDaemonProtocolProtos.SubmitWorkResponseProto respons return; } } - LOG.info("DBG: Submitted " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString()); } @Override @@ -166,7 +159,6 @@ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOExce TezHeartbeatResponse response = new TezHeartbeatResponse(); // Assuming TaskAttemptId and FragmentIdentifierString are the same. Verify this. TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID(); - LOG.info("ZZZ: DBG: Received heartbeat from taskAttemptId: " + taskAttemptId.toString()); List tezEvents = pendingEvents.remove(taskAttemptId.toString()); if (tezEvents == null) { diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 76d095a..91e4323 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -349,7 +349,7 @@ public void unregisterRunningTaskAttempt(final TezTaskAttemptID taskAttemptId, private void sendTaskTerminated(final TezTaskAttemptID taskAttemptId, boolean invokedByContainerEnd) { LOG.info( - "DBG: Attempting to send terminateRequest for fragment {} due to internal preemption invoked by {}", + "Attempting to send terminateRequest for fragment {} due to internal preemption invoked by {}", taskAttemptId.toString(), invokedByContainerEnd ? "containerEnd" : "taskEnd"); LlapNodeId nodeId = entityTracker.getNodeIdForTaskAttempt(taskAttemptId); // NodeId can be null if the task gets unregistered due to failure / being killed by the daemon itself diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 3bca0da..e1ad12d 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -931,7 +931,7 @@ private void preemptTasks(int forPriority, int numTasksToPreempt, String []poten } } else { // No tasks qualify as preemptable - LOG.info("DBG: No tasks qualify as killable to schedule tasks at priority {}", forPriority); + LOG.info("No tasks qualify as killable to schedule tasks at priority {}", forPriority); break; } } @@ -941,7 +941,7 @@ private void preemptTasks(int forPriority, int numTasksToPreempt, String []poten // Send out the preempted request outside of the lock. if (preemptedTaskList != null) { for (TaskInfo taskInfo : preemptedTaskList) { - LOG.info("DBG: Preempting task {}", taskInfo); + LOG.info("Preempting task {}", taskInfo); getContext().preemptContainer(taskInfo.containerId); // Preemption will finally be registered as a deallocateTask as a result of preemptContainer // That resets preemption info and allows additional tasks to be pre-empted if required. diff --git ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java deleted file mode 100644 index d308ec8..0000000 --- ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.llap; - -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.Set; - -import javax.security.auth.login.LoginException; - -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.tez.TezProcessor; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.tez.dag.api.ProcessorDescriptor; -import org.apache.tez.dag.api.TaskSpecBuilder; -import org.apache.tez.runtime.api.impl.TaskSpec; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.Socket; - -import org.apache.hadoop.hive.ql.exec.SerializationUtilities; - -import com.esotericsoftware.kryo.Kryo; -import java.io.ByteArrayOutputStream; -import java.io.OutputStream; -import java.io.InputStream; - -import java.io.IOException; -import java.io.InputStream; -import java.net.URISyntaxException; -import java.io.FileNotFoundException; -import java.util.UUID; - -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.commons.io.FilenameUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.SplitLocationInfo; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.hadoop.yarn.api.records.LocalResourceType; -import org.apache.tez.dag.api.DAG; -import org.apache.tez.dag.api.Vertex; -import org.apache.tez.runtime.api.Event; -import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.exec.tez.DagUtils; -import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator; -import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.plan.TezWork; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.Schema; -import org.apache.tez.dag.api.TaskLocationHint; -import org.apache.tez.dag.api.VertexLocationHint; -import org.apache.tez.dag.api.event.VertexStateUpdate; -import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem; -import org.apache.tez.mapreduce.hadoop.MRInputHelpers; -import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto; -import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto; -import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto; -import org.apache.tez.runtime.api.Event; -import org.apache.tez.runtime.api.InputInitializer; -import org.apache.tez.runtime.api.InputInitializerContext; -import org.apache.tez.runtime.api.InputSpecUpdate; -import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent; -import org.apache.tez.runtime.api.events.InputDataInformationEvent; -import org.apache.tez.runtime.api.events.InputInitializerEvent; - -import com.google.common.base.Preconditions; - -public class LlapInputFormat implements InputFormat { - - private static final Logger LOG = LoggerFactory.getLogger(LlapInputFormat.class); - - private final TezWork work; - private final Schema schema; - - public LlapInputFormat(TezWork tezWork, Schema schema) { - this.work = tezWork; - this.schema = schema; - } - - // need empty constructor for bean instantiation - public LlapInputFormat() { - // None of these fields should be required during getRecordReader, - // and should not be read. - work = null; - schema = null; - } - - /* - * This proxy record reader has the duty of establishing a connected socket with LLAP, then fire - * off the work in the split to LLAP and finally return the connected socket back in an - * LlapRecordReader. The LlapRecordReader class reads the results from the socket. - */ - @Override - public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { - - // Calls a static method to ensure none of the object fields are read. - return _getRecordReader(split, job, reporter); - } - - private static RecordReader _getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws - IOException { - LlapInputSplit llapSplit = (LlapInputSplit)split; - - // TODO: push event into LLAP - - // this is just the portion that sets up the io to receive data - String host = split.getLocations()[0]; - - // TODO: need to construct id here. Format is queryId + "_" + taskIndex - String id = "foobar"; - - HiveConf conf = new HiveConf(); - Socket socket = new Socket(host, - conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT)); - - LOG.debug("Socket connected"); - - socket.getOutputStream().write(id.getBytes()); - socket.getOutputStream().write(0); - socket.getOutputStream().flush(); - - LOG.debug("Registered id: " + id); - - return new LlapRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class); - } - - @Override - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - throw new IOException("These are not the splits you are looking for."); - } -} diff --git ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java index 4f38ff1..a197d7b 100644 --- ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java +++ ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java @@ -81,45 +81,45 @@ public static LlapOutputFormatService get() throws IOException { public void start() throws IOException { executor.submit(new Runnable() { - byte[] buffer = new byte[4096]; - @Override - public void run() { - while (true) { - Socket s = null; - try { - s = socket.accept(); - String id = readId(s); - LOG.debug("Received: "+id); - registerReader(s, id); - } catch (IOException io) { - if (s != null) { - try{ - s.close(); - } catch (IOException io2) { - // ignore - } - } - } - } - } + byte[] buffer = new byte[4096]; + @Override + public void run() { + while (true) { + Socket s = null; + try { + s = socket.accept(); + String id = readId(s); + LOG.debug("Received: "+id); + registerReader(s, id); + } catch (IOException io) { + if (s != null) { + try{ + s.close(); + } catch (IOException io2) { + // ignore + } + } + } + } + } - private String readId(Socket s) throws IOException { - InputStream in = s.getInputStream(); - int idx = 0; - while((buffer[idx++] = (byte)in.read()) != '\0') {} - return new String(buffer,0,idx-1); - } + private String readId(Socket s) throws IOException { + InputStream in = s.getInputStream(); + int idx = 0; + while((buffer[idx++] = (byte)in.read()) != '\0') {} + return new String(buffer,0,idx-1); + } - private void registerReader(Socket s, String id) throws IOException { - synchronized(service) { - LOG.debug("registering socket for: "+id); - LlapRecordWriter writer = new LlapRecordWriter(s.getOutputStream()); - writers.put(id, writer); - service.notifyAll(); + private void registerReader(Socket s, String id) throws IOException { + synchronized(service) { + LOG.debug("registering socket for: "+id); + LlapRecordWriter writer = new LlapRecordWriter(s.getOutputStream()); + writers.put(id, writer); + service.notifyAll(); + } + } } - } - } - ); + ); } public void stop() throws IOException, InterruptedException { @@ -132,10 +132,11 @@ public void stop() throws IOException, InterruptedException { RecordWriter writer = null; synchronized(service) { while ((writer = writers.get(id)) == null) { - LOG.debug("Waiting for writer for: "+id); - service.wait(); + LOG.info("Waiting for writer for: "+id); + service.wait(); } } + LOG.info("Returning writer for: "+id); return writer; } } diff --git ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java index 4d1996c..b632fae 100644 --- ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java +++ ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java @@ -20,19 +20,18 @@ import java.io.IOException; import java.io.OutputStream; -import java.io.DataOutputStream;; +import java.io.DataOutputStream; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.JobConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class LlapRecordWriter implements RecordWriter { + public static final Logger LOG = LoggerFactory.getLogger(LlapRecordWriter.class); DataOutputStream dos; @@ -42,6 +41,7 @@ public LlapRecordWriter(OutputStream out) { @Override public void close(Reporter reporter) throws IOException { + LOG.info("CLOSING the record writer output stream"); dos.close(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 0899793..02439be 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -36,11 +36,13 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.llap.LlapOutputFormat; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; @@ -202,6 +204,17 @@ public void closeWriters(boolean abort) throws HiveException { } } try { + if ("org.apache.hadoop.hive.llap.LlapStorageHandler".equals(getConf().getTableInfo().getProperties(). + get(hive_metastoreConstants.META_TABLE_STORAGE))) { + (new LlapOutputFormat()) + .getRecordWriter(null, + hconf instanceof JobConf ? (JobConf) hconf : new JobConf(hconf), null, null) + .close(null); + } + } catch (IOException e) { + // ignored + } + try { for (int i = 0; i < updaters.length; i++) { if (updaters[i] != null) { updaters[i].close(abort); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java index f3afa24..c782466 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java @@ -344,8 +344,6 @@ system.registerGenericUDF("ewah_bitmap_or", GenericUDFEWAHBitmapOr.class); system.registerGenericUDF("ewah_bitmap_empty", GenericUDFEWAHBitmapEmpty.class); - system.registerGenericUDF("get_splits", GenericUDFGetSplits.class); - // Aliases for Java Class Names // These are used in getImplicitConvertUDFMethod system.registerUDF(serdeConstants.BOOLEAN_TYPE_NAME, UDFToBoolean.class, false, UDFToBoolean.class.getSimpleName()); @@ -444,6 +442,8 @@ system.registerGenericUDTF("parse_url_tuple", GenericUDTFParseUrlTuple.class); system.registerGenericUDTF("posexplode", GenericUDTFPosExplode.class); system.registerGenericUDTF("stack", GenericUDTFStack.class); + system.registerGenericUDTF("get_splits", GenericUDTFGetSplits.class); + system.registerGenericUDTF("execute_splits", GenericUDTFExecuteSplits.class); //PTF declarations system.registerGenericUDF(LEAD_FUNC_NAME, GenericUDFLead.class); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index 011e459..b16368f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -92,6 +92,7 @@ public void initializeSplitGenerator(Configuration conf, MapWork work) throws IO this.conf = conf; this.work = work; + this.jobConf = new JobConf(conf); // TODO RSHACK - assuming grouping enabled always. userPayloadProto = MRInputUserPayloadProto.newBuilder().setGroupingEnabled(true).build(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index 3fe70ab..7a3d6a6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -95,7 +95,9 @@ public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) t super(jconf, context); String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID); if (LlapProxy.isDaemon()) { // do not cache plan - jconf.set(LlapOutputFormat.LLAP_OF_ID_KEY, queryId + "_" + context.getTaskIndex()); + String id = queryId + "_" + context.getTaskIndex(); + l4j.info("LLAP_OF_ID: "+id); + jconf.set(LlapOutputFormat.LLAP_OF_ID_KEY, id); cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache(); } else { cache = ObjectCacheFactory.getCache(jconf, queryId); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java index 454c321..6d00a0a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java @@ -23,11 +23,14 @@ import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class HivePassThroughRecordWriter , V extends Writable> implements RecordWriter { + public static final Logger LOG = LoggerFactory.getLogger(HivePassThroughRecordWriter.class); private final org.apache.hadoop.mapred.RecordWriter mWriter; public HivePassThroughRecordWriter(org.apache.hadoop.mapred.RecordWriter writer) { @@ -42,6 +45,7 @@ public void write(Writable r) throws IOException { @Override public void close(boolean abort) throws IOException { + LOG.info("Closing the pass through writer."); //close with null reporter mWriter.close(null); } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java deleted file mode 100644 index f69dea3..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java +++ /dev/null @@ -1,412 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.udf.generic; - -import javax.security.auth.login.LoginException; -import java.io.ByteArrayOutputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Random; -import java.util.Set; -import java.util.UUID; - -import com.esotericsoftware.kryo.Kryo; -import com.google.common.base.Preconditions; -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.commons.io.FilenameUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.LlapInputSplit; -import org.apache.hadoop.hive.llap.LlapOutputFormat; -import org.apache.hadoop.hive.llap.SubmitWorkInfo; -import org.apache.hadoop.hive.metastore.api.Schema; -import org.apache.hadoop.hive.ql.CommandNeedRetryException; -import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.Driver; -import org.apache.hadoop.hive.ql.QueryPlan; -import org.apache.hadoop.hive.ql.exec.Description; -import org.apache.hadoop.hive.ql.exec.MapredContext; -import org.apache.hadoop.hive.ql.exec.SerializationUtilities; -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.UDFArgumentException; -import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; -import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; -import org.apache.hadoop.hive.ql.exec.tez.DagUtils; -import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator; -import org.apache.hadoop.hive.ql.exec.tez.TezTask; -import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.plan.TezWork; -import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.udf.UDFType; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.SplitLocationInfo; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.hadoop.yarn.api.records.LocalResourceType; -import org.apache.tez.dag.api.DAG; -import org.apache.tez.dag.api.TaskLocationHint; -import org.apache.tez.dag.api.TaskSpecBuilder; -import org.apache.tez.dag.api.Vertex; -import org.apache.tez.runtime.api.Event; -import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent; -import org.apache.tez.runtime.api.impl.EventMetaData; -import org.apache.tez.runtime.api.impl.TaskSpec; -import org.apache.tez.runtime.api.impl.TezEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * GenericUDFGetSplits. - * - */ -@Description(name = "get_splits", value = "_FUNC_(string,int) - " - + "Returns an array of length int serialized splits for the referenced tables string.") -@UDFType(deterministic = false) -public class GenericUDFGetSplits extends GenericUDF { - - private static final Logger LOG = LoggerFactory.getLogger(GenericUDFGetSplits.class); - - private static final String LLAP_INTERNAL_INPUT_FORMAT_NAME = "org.apache.hadoop.hive.llap.LlapInputFormat"; - - private transient StringObjectInspector stringOI; - private transient IntObjectInspector intOI; - private final ArrayList retArray = new ArrayList(); - private transient JobConf jc; - private transient Hive db; - private ByteArrayOutputStream bos; - private DataOutput dos; - - @Override - public ObjectInspector initialize(ObjectInspector[] arguments) - throws UDFArgumentException { - - LOG.debug("initializing GenericUDFGetSplits"); - - try { - if (SessionState.get() != null && SessionState.get().getConf() != null) { - HiveConf conf = SessionState.get().getConf(); - jc = DagUtils.getInstance().createConfiguration(conf); - db = Hive.get(conf); - } else { - jc = MapredContext.get().getJobConf(); - db = Hive.get(); - } - } catch(Exception e) { - LOG.error("Failed to initialize: ",e); - throw new UDFArgumentException(e); - } - - LOG.debug("Initialized conf, jc and metastore connection"); - - if (arguments.length != 2) { - throw new UDFArgumentLengthException("The function GET_SPLITS accepts 2 arguments."); - } else if (!(arguments[0] instanceof StringObjectInspector)) { - LOG.error("Got "+arguments[0].getTypeName()+" instead of string."); - throw new UDFArgumentTypeException(0, "\"" - + "string\" is expected at function GET_SPLITS, " + "but \"" - + arguments[0].getTypeName() + "\" is found"); - } else if (!(arguments[1] instanceof IntObjectInspector)) { - LOG.error("Got "+arguments[1].getTypeName()+" instead of int."); - throw new UDFArgumentTypeException(1, "\"" - + "int\" is expected at function GET_SPLITS, " + "but \"" - + arguments[1].getTypeName() + "\" is found"); - } - - stringOI = (StringObjectInspector) arguments[0]; - intOI = (IntObjectInspector) arguments[1]; - - List names = Arrays.asList("if_class","split_class","split"); - List fieldOIs = Arrays.asList( - PrimitiveObjectInspectorFactory.javaStringObjectInspector, - PrimitiveObjectInspectorFactory.javaStringObjectInspector, - PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector); - ObjectInspector outputOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, fieldOIs); - ObjectInspector listOI = ObjectInspectorFactory.getStandardListObjectInspector(outputOI); - bos = new ByteArrayOutputStream(1024); - dos = new DataOutputStream(bos); - - LOG.debug("done initializing GenericUDFGetSplits"); - return listOI; - } - - @Override - public Object evaluate(DeferredObject[] arguments) throws HiveException { - retArray.clear(); - - String query = stringOI.getPrimitiveJavaObject(arguments[0].get()); - - int num = intOI.get(arguments[1].get()); - - Driver driver = new Driver(); - CommandProcessorResponse cpr; - - HiveConf conf = SessionState.get().getConf(); - - if (conf == null) { - throw new HiveException("Need configuration"); - } - - String fetchTaskConversion = HiveConf.getVar(conf, ConfVars.HIVEFETCHTASKCONVERSION); - String queryResultFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); - - try { - LOG.info("setting fetch.task.conversion to none and query file format to \""+LlapOutputFormat.class.getName()+"\""); - HiveConf.setVar(conf, ConfVars.HIVEFETCHTASKCONVERSION, "none"); - HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, "Llap"); - - cpr = driver.compileAndRespond(query); - if(cpr.getResponseCode() != 0) { - throw new HiveException("Failed to compile query: "+cpr.getException()); - } - - QueryPlan plan = driver.getPlan(); - List> roots = plan.getRootTasks(); - Schema schema = plan.getResultSchema(); - - if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) { - throw new HiveException("Was expecting a single TezTask."); - } - - TezWork tezWork = ((TezTask)roots.get(0)).getWork(); - - if (tezWork.getAllWork().size() != 1) { - - String tableName = "table_"+UUID.randomUUID().toString().replaceAll("[^A-Za-z0-9 ]", ""); - - String ctas = "create temporary table "+tableName+" as "+query; - LOG.info("CTAS: "+ctas); - - try { - cpr = driver.run(ctas, false); - } catch(CommandNeedRetryException e) { - throw new HiveException(e); - } - - if(cpr.getResponseCode() != 0) { - throw new HiveException("Failed to create temp table: " + cpr.getException()); - } - - query = "select * from " + tableName; - cpr = driver.compileAndRespond(query); - if(cpr.getResponseCode() != 0) { - throw new HiveException("Failed to create temp table: "+cpr.getException()); - } - - plan = driver.getPlan(); - roots = plan.getRootTasks(); - schema = plan.getResultSchema(); - - if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) { - throw new HiveException("Was expecting a single TezTask."); - } - - tezWork = ((TezTask)roots.get(0)).getWork(); - } - - MapWork w = (MapWork)tezWork.getAllWork().get(0); - - try { - for (InputSplit s: getSplits(jc, num, tezWork, schema)) { - Object[] os = new Object[3]; - os[0] = LLAP_INTERNAL_INPUT_FORMAT_NAME; - os[1] = s.getClass().getName(); - bos.reset(); - s.write(dos); - byte[] frozen = bos.toByteArray(); - os[2] = frozen; - retArray.add(os); - } - } catch(Exception e) { - throw new HiveException(e); - } - } finally { - HiveConf.setVar(conf, ConfVars.HIVEFETCHTASKCONVERSION, fetchTaskConversion); - HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, queryResultFormat); - } - return retArray; - } - - public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema) throws IOException { - DAG dag = DAG.create(work.getName()); - dag.setCredentials(job.getCredentials()); - // TODO: set access control? TezTask.setAccessControlsForCurrentUser(dag); - - DagUtils utils = DagUtils.getInstance(); - Context ctx = new Context(job); - MapWork mapWork = (MapWork) work.getAllWork().get(0); - // bunch of things get setup in the context based on conf but we need only the MR tmp directory - // for the following method. - JobConf wxConf = utils.initializeVertexConf(job, ctx, mapWork); - Path scratchDir = utils.createTezDir(ctx.getMRScratchDir(), job); - FileSystem fs = scratchDir.getFileSystem(job); - try { - LocalResource appJarLr = createJarLocalResource(utils.getExecJarPathLocal(), utils, job); - Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, appJarLr, - new ArrayList(), fs, ctx, false, work, - work.getVertexType(mapWork)); - String vertexName = wx.getName(); - dag.addVertex(wx); - utils.addCredentials(mapWork, dag); - - - // we have the dag now proceed to get the splits: - HiveSplitGenerator splitGenerator = new HiveSplitGenerator(null); - Preconditions.checkState(HiveConf.getBoolVar(wxConf, - HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS)); - Preconditions.checkState(HiveConf.getBoolVar(wxConf, - HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS)); - splitGenerator.initializeSplitGenerator(wxConf, mapWork); - List eventList = splitGenerator.initialize(); - - // hack - just serializing with kryo for now. This needs to be done properly - InputSplit[] result = new InputSplit[eventList.size() - 1]; - DataOutputBuffer dob = new DataOutputBuffer(); - - InputConfigureVertexTasksEvent configureEvent = (InputConfigureVertexTasksEvent) eventList.get(0); - - List hints = configureEvent.getLocationHint().getTaskLocationHints(); - - Preconditions.checkState(hints.size() == eventList.size() -1); - - LOG.error("DBG: NumEvents=" + eventList.size()); - LOG.error("DBG: NumSplits=" + result.length); - - ApplicationId fakeApplicationId = ApplicationId.newInstance(Math.abs(new Random().nextInt()), 0); - TaskSpec taskSpec = - new TaskSpecBuilder().constructTaskSpec(dag, vertexName, eventList.size() - 1, fakeApplicationId); - - SubmitWorkInfo submitWorkInfo = - new SubmitWorkInfo(taskSpec, fakeApplicationId, System.currentTimeMillis()); - EventMetaData sourceMetaData = - new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, vertexName, - "NULL_VERTEX", null); - EventMetaData destinationMetaInfo = new TaskSpecBuilder().getDestingationMetaData(wx); - - LOG.info("DBG: Number of splits: " + (eventList.size() - 1)); - for (int i = 0; i < eventList.size() - 1; i++) { - // Creating the TezEvent here itself, since it's easy to serialize. - Event event = eventList.get(i + 1); - TaskLocationHint hint = hints.get(i); - Set hosts = hint.getHosts(); - LOG.info("DBG: Using locations: " + hosts.toString()); - if (hosts.size() != 1) { - LOG.warn("DBG: Bad # of locations: " + hosts.size()); - } - SplitLocationInfo[] locations = new SplitLocationInfo[hosts.size()]; - - int j = 0; - for (String host : hosts) { - locations[j++] = new SplitLocationInfo(host, false); - } - TezEvent tezEvent = new TezEvent(event, sourceMetaData, System.currentTimeMillis()); - tezEvent.setDestinationInfo(destinationMetaInfo); - - bos.reset(); - dob.reset(); - tezEvent.write(dob); - - byte[] submitWorkBytes = SubmitWorkInfo.toBytes(submitWorkInfo); - - result[i] = - new LlapInputSplit(i, submitWorkBytes, dob.getData(), locations, schema); - } - return result; - } catch (Exception e) { - throw new IOException(e); - } - } - - /** - * Returns a local resource representing a jar. This resource will be used to execute the plan on - * the cluster. - * - * @param localJarPath - * Local path to the jar to be localized. - * @return LocalResource corresponding to the localized hive exec resource. - * @throws IOException - * when any file system related call fails. - * @throws LoginException - * when we are unable to determine the user. - * @throws URISyntaxException - * when current jar location cannot be determined. - */ - private LocalResource createJarLocalResource(String localJarPath, DagUtils utils, - Configuration conf) - throws IOException, LoginException, IllegalArgumentException, FileNotFoundException { - FileStatus destDirStatus = utils.getHiveJarDirectory(conf); - assert destDirStatus != null; - Path destDirPath = destDirStatus.getPath(); - - Path localFile = new Path(localJarPath); - String sha = getSha(localFile, conf); - - String destFileName = localFile.getName(); - - // Now, try to find the file based on SHA and name. Currently we require exact name match. - // We could also allow cutting off versions and other stuff provided that SHA matches... - destFileName = FilenameUtils.removeExtension(destFileName) + "-" + sha - + FilenameUtils.EXTENSION_SEPARATOR + FilenameUtils.getExtension(destFileName); - - // TODO: if this method is ever called on more than one jar, getting the dir and the - // list need to be refactored out to be done only once. - Path destFile = new Path(destDirPath.toString() + "/" + destFileName); - return utils.localizeResource(localFile, destFile, LocalResourceType.FILE, conf); - } - - private String getSha(Path localFile, Configuration conf) - throws IOException, IllegalArgumentException { - InputStream is = null; - try { - FileSystem localFs = FileSystem.getLocal(conf); - is = localFs.open(localFile); - return DigestUtils.sha256Hex(is); - } finally { - if (is != null) { - is.close(); - } - } - } - - @Override - public String getDisplayString(String[] children) { - assert children.length == 2; - return getStandardDisplayString("get_splits", children); - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExecuteSplits.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExecuteSplits.java new file mode 100644 index 0000000..12759ab --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExecuteSplits.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDTFGetSplits.PlanFragment; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * GenericUDTFExecuteSplits. + * + */ +@Description(name = "execute_splits", value = "_FUNC_(string,int) - " + + "Returns an array of length int serialized splits for the referenced tables string.") +@UDFType(deterministic = false) +public class GenericUDTFExecuteSplits extends GenericUDTFGetSplits { + + private static final Logger LOG = LoggerFactory.getLogger(GenericUDTFExecuteSplits.class); + + @Override + public StructObjectInspector initialize(ObjectInspector[] arguments) + throws UDFArgumentException { + + LOG.debug("initializing ExecuteSplits"); + + if (SessionState.get() == null || SessionState.get().getConf() == null) { + throw new IllegalStateException("Cannot run execute splits outside HS2"); + } + + if (arguments.length != 2) { + throw new UDFArgumentLengthException("The function execute_splits accepts 2 arguments."); + } else if (!(arguments[0] instanceof StringObjectInspector)) { + LOG.error("Got "+arguments[0].getTypeName()+" instead of string."); + throw new UDFArgumentTypeException(0, "\"" + + "string\" is expected at function execute_splits, " + "but \"" + + arguments[0].getTypeName() + "\" is found"); + } else if (!(arguments[1] instanceof IntObjectInspector)) { + LOG.error("Got "+arguments[1].getTypeName()+" instead of int."); + throw new UDFArgumentTypeException(1, "\"" + + "int\" is expected at function execute_splits, " + "but \"" + + arguments[1].getTypeName() + "\" is found"); + } + + stringOI = (StringObjectInspector) arguments[0]; + intOI = (IntObjectInspector) arguments[1]; + + List names = Arrays.asList("split_num","value"); + List fieldOIs = Arrays.asList( + PrimitiveObjectInspectorFactory.javaIntObjectInspector, + PrimitiveObjectInspectorFactory.javaStringObjectInspector); + StructObjectInspector outputOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, fieldOIs); + + LOG.debug("done initializing GenericUDTFExecuteSplits"); + return outputOI; + } + + @Override + public void process(Object[] arguments) throws HiveException { + + String query = stringOI.getPrimitiveJavaObject(arguments[0]); + int num = intOI.get(arguments[1]); + + PlanFragment fragment = createPlanFragment(query, num); + try { + InputFormat format = (InputFormat)(Class.forName("org.apache.hadoop.hive.llap.LlapInputFormat").newInstance()); + int index = 0; + for (InputSplit s: getSplits(jc, num, fragment.work, fragment.schema)) { + RecordReader reader = format.getRecordReader(s,fragment.jc,null); + Text value = reader.createValue(); + NullWritable key = reader.createKey(); + index++; + while(reader.next(key,value)) { + Object[] os = new Object[2]; + os[0] = index; + os[1] = value.toString(); + forward(os); + } + } + } catch(Exception e) { + throw new HiveException(e); + } + } + + @Override + public void close() throws HiveException { + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java new file mode 100644 index 0000000..ebb0ca5 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -0,0 +1,420 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.UUID; + +import javax.security.auth.login.LoginException; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.io.FilenameUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.LlapInputSplit; +import org.apache.hadoop.hive.llap.LlapOutputFormat; +import org.apache.hadoop.hive.llap.SubmitWorkInfo; +import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; +import org.apache.hadoop.hive.ql.exec.tez.DagUtils; +import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.SplitLocationInfo; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.TaskLocationHint; +import org.apache.tez.dag.api.TaskSpecBuilder; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent; +import org.apache.tez.runtime.api.impl.EventMetaData; +import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.runtime.api.impl.TezEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * GenericUDTFGetSplits. + * + */ +@Description(name = "get_splits", value = "_FUNC_(string,int) - " + + "Returns an array of length int serialized splits for the referenced tables string.") +@UDFType(deterministic = false) +public class GenericUDTFGetSplits extends GenericUDTF { + + private static final Logger LOG = LoggerFactory.getLogger(GenericUDTFGetSplits.class); + + private static final String LLAP_INTERNAL_INPUT_FORMAT_NAME = "org.apache.hadoop.hive.llap.LlapInputFormat"; + + protected transient StringObjectInspector stringOI; + protected transient IntObjectInspector intOI; + protected transient JobConf jc; + private ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); + private DataOutput dos = new DataOutputStream(bos); + + @Override + public StructObjectInspector initialize(ObjectInspector[] arguments) + throws UDFArgumentException { + + LOG.debug("initializing GenericUDFGetSplits"); + + if (SessionState.get() == null || SessionState.get().getConf() == null) { + throw new IllegalStateException("Cannot run get splits outside HS2"); + } + + LOG.debug("Initialized conf, jc and metastore connection"); + + if (arguments.length != 2) { + throw new UDFArgumentLengthException("The function GET_SPLITS accepts 2 arguments."); + } else if (!(arguments[0] instanceof StringObjectInspector)) { + LOG.error("Got "+arguments[0].getTypeName()+" instead of string."); + throw new UDFArgumentTypeException(0, "\"" + + "string\" is expected at function GET_SPLITS, " + "but \"" + + arguments[0].getTypeName() + "\" is found"); + } else if (!(arguments[1] instanceof IntObjectInspector)) { + LOG.error("Got "+arguments[1].getTypeName()+" instead of int."); + throw new UDFArgumentTypeException(1, "\"" + + "int\" is expected at function GET_SPLITS, " + "but \"" + + arguments[1].getTypeName() + "\" is found"); + } + + stringOI = (StringObjectInspector) arguments[0]; + intOI = (IntObjectInspector) arguments[1]; + + List names = Arrays.asList("if_class","split_class","split"); + List fieldOIs = Arrays.asList( + PrimitiveObjectInspectorFactory.javaStringObjectInspector, + PrimitiveObjectInspectorFactory.javaStringObjectInspector, + PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector); + StructObjectInspector outputOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, fieldOIs); + + LOG.debug("done initializing GenericUDFGetSplits"); + return outputOI; + } + + public static class PlanFragment { + public JobConf jc; + public TezWork work; + public Schema schema; + + public PlanFragment(TezWork work, Schema schema, JobConf jc) { + this.work = work; + this.schema = schema; + this.jc = jc; + } + } + + @Override + public void process(Object[] arguments) throws HiveException { + + String query = stringOI.getPrimitiveJavaObject(arguments[0]); + int num = intOI.get(arguments[1]); + + PlanFragment fragment = createPlanFragment(query, num); + TezWork tezWork = fragment.work; + Schema schema = fragment.schema; + + try { + for (InputSplit s: getSplits(jc, num, tezWork, schema)) { + Object[] os = new Object[3]; + os[0] = LLAP_INTERNAL_INPUT_FORMAT_NAME; + os[1] = s.getClass().getName(); + bos.reset(); + s.write(dos); + byte[] frozen = bos.toByteArray(); + os[2] = frozen; + forward(os); + } + } catch(Exception e) { + throw new HiveException(e); + } + } + + public PlanFragment createPlanFragment(String query, int num) throws HiveException { + + HiveConf conf = new HiveConf(SessionState.get().getConf()); + HiveConf.setVar(conf, ConfVars.HIVEFETCHTASKCONVERSION, "none"); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, "Llap"); + + String originalMode = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE, "llap"); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS, true); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS, true); + + try { + jc = DagUtils.getInstance().createConfiguration(conf); + } catch (IOException e) { + throw new HiveException(e); + } + + Driver driver = new Driver(conf); + CommandProcessorResponse cpr; + + LOG.info("setting fetch.task.conversion to none and query file format to \"" + + LlapOutputFormat.class.getName()+"\""); + + cpr = driver.compileAndRespond(query); + if(cpr.getResponseCode() != 0) { + throw new HiveException("Failed to compile query: "+cpr.getException()); + } + + QueryPlan plan = driver.getPlan(); + List> roots = plan.getRootTasks(); + Schema schema = plan.getResultSchema(); + + if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) { + throw new HiveException("Was expecting a single TezTask."); + } + + TezWork tezWork = ((TezTask)roots.get(0)).getWork(); + + if (tezWork.getAllWork().size() != 1) { + + String tableName = "table_"+UUID.randomUUID().toString().replaceAll("[^A-Za-z0-9 ]", ""); + + String ctas = "create temporary table "+tableName+" as "+query; + LOG.info("CTAS: "+ctas); + + try { + HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE, originalMode); + cpr = driver.run(ctas, false); + } catch(CommandNeedRetryException e) { + throw new HiveException(e); + } + + if(cpr.getResponseCode() != 0) { + throw new HiveException("Failed to create temp table: " + cpr.getException()); + } + + HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE, "llap"); + query = "select * from " + tableName; + cpr = driver.compileAndRespond(query); + if(cpr.getResponseCode() != 0) { + throw new HiveException("Failed to create temp table: "+cpr.getException()); + } + + plan = driver.getPlan(); + roots = plan.getRootTasks(); + schema = plan.getResultSchema(); + + if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) { + throw new HiveException("Was expecting a single TezTask."); + } + + tezWork = ((TezTask)roots.get(0)).getWork(); + } + + return new PlanFragment(tezWork, schema, jc); + } + + public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema) + throws IOException { + + DAG dag = DAG.create(work.getName()); + dag.setCredentials(job.getCredentials()); + // TODO: set access control? TezTask.setAccessControlsForCurrentUser(dag); + + DagUtils utils = DagUtils.getInstance(); + Context ctx = new Context(job); + MapWork mapWork = (MapWork) work.getAllWork().get(0); + // bunch of things get setup in the context based on conf but we need only the MR tmp directory + // for the following method. + JobConf wxConf = utils.initializeVertexConf(job, ctx, mapWork); + Path scratchDir = utils.createTezDir(ctx.getMRScratchDir(), job); + FileSystem fs = scratchDir.getFileSystem(job); + try { + LocalResource appJarLr = createJarLocalResource(utils.getExecJarPathLocal(), utils, job); + Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, appJarLr, + new ArrayList(), fs, ctx, false, work, + work.getVertexType(mapWork)); + String vertexName = wx.getName(); + dag.addVertex(wx); + utils.addCredentials(mapWork, dag); + + + // we have the dag now proceed to get the splits: + HiveSplitGenerator splitGenerator = new HiveSplitGenerator(null); + Preconditions.checkState(HiveConf.getBoolVar(wxConf, + HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS)); + Preconditions.checkState(HiveConf.getBoolVar(wxConf, + HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS)); + splitGenerator.initializeSplitGenerator(wxConf, mapWork); + List eventList = splitGenerator.initialize(); + + // hack - just serializing with kryo for now. This needs to be done properly + InputSplit[] result = new InputSplit[eventList.size() - 1]; + DataOutputBuffer dob = new DataOutputBuffer(); + + InputConfigureVertexTasksEvent configureEvent + = (InputConfigureVertexTasksEvent) eventList.get(0); + + List hints = configureEvent.getLocationHint().getTaskLocationHints(); + + Preconditions.checkState(hints.size() == eventList.size() - 1); + + if (LOG.isDebugEnabled()) { + LOG.debug("NumEvents=" + eventList.size()); + LOG.debug("NumSplits=" + result.length); + } + + ApplicationId fakeApplicationId + = ApplicationId.newInstance(Math.abs(new Random().nextInt()), 0); + + LOG.info("Number of splits: " + (eventList.size() - 1)); + for (int i = 0; i < eventList.size() - 1; i++) { + + TaskSpec taskSpec = + new TaskSpecBuilder().constructTaskSpec(dag, vertexName, + eventList.size() - 1, fakeApplicationId, i); + + SubmitWorkInfo submitWorkInfo = + new SubmitWorkInfo(taskSpec, fakeApplicationId, System.currentTimeMillis()); + EventMetaData sourceMetaData = + new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, vertexName, + "NULL_VERTEX", null); + EventMetaData destinationMetaInfo = new TaskSpecBuilder().getDestingationMetaData(wx); + + // Creating the TezEvent here itself, since it's easy to serialize. + Event event = eventList.get(i + 1); + TaskLocationHint hint = hints.get(i); + Set hosts = hint.getHosts(); + if (hosts.size() != 1) { + LOG.warn("Bad # of locations: " + hosts.size()); + } + SplitLocationInfo[] locations = new SplitLocationInfo[hosts.size()]; + + int j = 0; + for (String host : hosts) { + locations[j++] = new SplitLocationInfo(host, false); + } + TezEvent tezEvent = new TezEvent(event, sourceMetaData, System.currentTimeMillis()); + tezEvent.setDestinationInfo(destinationMetaInfo); + + bos.reset(); + dob.reset(); + tezEvent.write(dob); + + byte[] submitWorkBytes = SubmitWorkInfo.toBytes(submitWorkInfo); + + result[i] = new LlapInputSplit(i, submitWorkBytes, dob.getData(), locations, schema); + } + return result; + } catch (Exception e) { + throw new IOException(e); + } + } + + /** + * Returns a local resource representing a jar. This resource will be used to execute the plan on + * the cluster. + * + * @param localJarPath + * Local path to the jar to be localized. + * @return LocalResource corresponding to the localized hive exec resource. + * @throws IOException + * when any file system related call fails. + * @throws LoginException + * when we are unable to determine the user. + * @throws URISyntaxException + * when current jar location cannot be determined. + */ + private LocalResource createJarLocalResource(String localJarPath, DagUtils utils, + Configuration conf) + throws IOException, LoginException, IllegalArgumentException, FileNotFoundException { + FileStatus destDirStatus = utils.getHiveJarDirectory(conf); + assert destDirStatus != null; + Path destDirPath = destDirStatus.getPath(); + + Path localFile = new Path(localJarPath); + String sha = getSha(localFile, conf); + + String destFileName = localFile.getName(); + + // Now, try to find the file based on SHA and name. Currently we require exact name match. + // We could also allow cutting off versions and other stuff provided that SHA matches... + destFileName = FilenameUtils.removeExtension(destFileName) + "-" + sha + + FilenameUtils.EXTENSION_SEPARATOR + FilenameUtils.getExtension(destFileName); + + // TODO: if this method is ever called on more than one jar, getting the dir and the + // list need to be refactored out to be done only once. + Path destFile = new Path(destDirPath.toString() + "/" + destFileName); + return utils.localizeResource(localFile, destFile, LocalResourceType.FILE, conf); + } + + private String getSha(Path localFile, Configuration conf) + throws IOException, IllegalArgumentException { + InputStream is = null; + try { + FileSystem localFs = FileSystem.getLocal(conf); + is = localFs.open(localFile); + return DigestUtils.sha256Hex(is); + } finally { + if (is != null) { + is.close(); + } + } + } + + @Override + public void close() throws HiveException { + } +} diff --git ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java index 5cabb6a..5db8c48 100644 --- ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java +++ ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java @@ -17,7 +17,7 @@ // Proxy class within the tez.api package to access package private methods. public class TaskSpecBuilder { - public TaskSpec constructTaskSpec(DAG dag, String vertexName, int numSplits, ApplicationId appId) { + public TaskSpec constructTaskSpec(DAG dag, String vertexName, int numSplits, ApplicationId appId, int index) { Vertex vertex = dag.getVertex(vertexName); ProcessorDescriptor processorDescriptor = vertex.getProcessorDescriptor(); List> inputs = @@ -43,7 +43,7 @@ public TaskSpec constructTaskSpec(DAG dag, String vertexName, int numSplits, App TezDAGID dagId = TezDAGID.getInstance(appId, 0); TezVertexID vertexId = TezVertexID.getInstance(dagId, 0); - TezTaskID taskId = TezTaskID.getInstance(vertexId, 0); + TezTaskID taskId = TezTaskID.getInstance(vertexId, index); TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 0); return new TaskSpec(taskAttemptId, dag.getName(), vertexName, numSplits, processorDescriptor, inputSpecs, outputSpecs, null); } diff --git ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java index c49231c..7b516fe 100644 --- ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java @@ -79,46 +79,52 @@ public void tearDown() throws IOException, InterruptedException { @Test public void testValues() throws Exception { JobConf job = new JobConf(); - job.set(LlapOutputFormat.LLAP_OF_ID_KEY, "foobar"); - LlapOutputFormat format = new LlapOutputFormat(); - HiveConf conf = new HiveConf(); - Socket socket = new Socket("localhost", - conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT)); + for (int k = 0; k < 5; ++k) { + String id = "foobar"+k; + job.set(LlapOutputFormat.LLAP_OF_ID_KEY, id); + LlapOutputFormat format = new LlapOutputFormat(); - LOG.debug("Socket connected"); + HiveConf conf = new HiveConf(); + Socket socket = new Socket("localhost", + conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT)); - socket.getOutputStream().write("foobar".getBytes()); - socket.getOutputStream().write(0); - socket.getOutputStream().flush(); + LOG.debug("Socket connected"); - Thread.sleep(3000); + socket.getOutputStream().write(id.getBytes()); + socket.getOutputStream().write(0); + socket.getOutputStream().flush(); - LOG.debug("Data written"); + Thread.sleep(3000); - RecordWriter writer = format.getRecordWriter(null, job, null, null); - Text text = new Text(); + LOG.debug("Data written"); - LOG.debug("Have record writer"); + RecordWriter writer = format.getRecordWriter(null, job, null, null); + Text text = new Text(); - for (int i = 0; i < 10; ++i) { - text.set(""+i); - writer.write(NullWritable.get(),text); - } + LOG.debug("Have record writer"); - writer.close(null); + for (int i = 0; i < 10; ++i) { + text.set(""+i); + writer.write(NullWritable.get(),text); + } - InputStream in = socket.getInputStream(); - RecordReader reader = new LlapRecordReader(in, null, Text.class); + writer.close(null); - LOG.debug("Have record reader"); + InputStream in = socket.getInputStream(); + RecordReader reader = new LlapRecordReader(in, null, Text.class); - int count = 0; - while(reader.next(NullWritable.get(), text)) { - LOG.debug(text.toString()); - count++; - } + LOG.debug("Have record reader"); + + int count = 0; + while(reader.next(NullWritable.get(), text)) { + LOG.debug(text.toString()); + count++; + } - Assert.assertEquals(count,10); + reader.close(); + + Assert.assertEquals(count,10); + } } } diff --git ql/src/test/queries/clientpositive/udf_get_splits.q ql/src/test/queries/clientpositive/udf_get_splits.q deleted file mode 100644 index 70400e8..0000000 --- ql/src/test/queries/clientpositive/udf_get_splits.q +++ /dev/null @@ -1,6 +0,0 @@ -set hive.fetch.task.conversion=more; - -DESCRIBE FUNCTION get_splits; -DESCRIBE FUNCTION EXTENDED get_splits; - -select r.if_class as ic, r.split_class as sc, hash(r.split) as h, length(r.split) as l from (select explode(get_splits("select key, count(*) from src where key % 2 = 0 group by key",5)) as r) t; diff --git ql/src/test/queries/clientpositive/udtf_get_splits.q ql/src/test/queries/clientpositive/udtf_get_splits.q new file mode 100644 index 0000000..f378dca --- /dev/null +++ ql/src/test/queries/clientpositive/udtf_get_splits.q @@ -0,0 +1,43 @@ +set hive.fetch.task.conversion=more; +set hive.mapred.mode=nonstrict; +set mapred.max.split.size=100; +set mapred.min.split.size.per.node=100; +set mapred.min.split.size.per.rack=100; +set mapred.max.split.size=100; +set tez.grouping.max-size=100; +set tez.grouping.min-size=100; + +DESCRIBE FUNCTION get_splits; +DESCRIBE FUNCTION execute_splits; + +select r1, r2, floor(length(r3)/100000) +from + (select + get_splits( + "select key, count(*) from srcpart where key % 2 = 0 group by key", + 5) as (r1, r2, r3)) t; + +select r1, r2, floor(length(r3)/100000) +from + (select + get_splits( + "select key from srcpart where key % 2 = 0", + 5) as (r1, r2, r3)) t; + +show tables; + +select r1, r2 +from + (select + execute_splits( + "select key from srcpart where key % 2 = 0", + 1) as (r1, r2)) t; + +select r1, r2 +from + (select + execute_splits( + "select key from srcpart where key % 2 = 0", + 5) as (r1, r2)) t; + +select count(*) from (select key from srcpart where key % 2 = 0) t; diff --git ql/src/test/results/clientpositive/llap/udtf_get_splits.q.out ql/src/test/results/clientpositive/llap/udtf_get_splits.q.out new file mode 100644 index 0000000..2f17a91 --- /dev/null +++ ql/src/test/results/clientpositive/llap/udtf_get_splits.q.out @@ -0,0 +1,2130 @@ +PREHOOK: query: DESCRIBE FUNCTION get_splits +PREHOOK: type: DESCFUNCTION +POSTHOOK: query: DESCRIBE FUNCTION get_splits +POSTHOOK: type: DESCFUNCTION +get_splits(string,int) - Returns an array of length int serialized splits for the referenced tables string. +PREHOOK: query: DESCRIBE FUNCTION execute_splits +PREHOOK: type: DESCFUNCTION +POSTHOOK: query: DESCRIBE FUNCTION execute_splits +POSTHOOK: type: DESCFUNCTION +execute_splits(string,int) - Returns an array of length int serialized splits for the referenced tables string. +PREHOOK: query: select r1, r2, floor(length(r3)/100000) +from + (select + get_splits( + "select key, count(*) from srcpart where key % 2 = 0 group by key", + 5) as (r1, r2, r3)) t +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +#### A masked pattern was here #### +POSTHOOK: query: select r1, r2, floor(length(r3)/100000) +from + (select + get_splits( + "select key, count(*) from srcpart where key % 2 = 0 group by key", + 5) as (r1, r2, r3)) t +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +#### A masked pattern was here #### +PREHOOK: query: select r1, r2, floor(length(r3)/100000) +from + (select + get_splits( + "select key, count(*) from srcpart where key % 2 = 0 group by key", + 5) as (r1, r2, r3)) t +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +PREHOOK: Output: database:default +PREHOOK: Output: default@#### A masked pattern was here #### +POSTHOOK: query: select r1, r2, floor(length(r3)/100000) +from + (select + get_splits( + "select key, count(*) from srcpart where key % 2 = 0 group by key", + 5) as (r1, r2, r3)) t +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +POSTHOOK: Output: database:default +POSTHOOK: Output: default@#### A masked pattern was here #### +org.apache.hadoop.hive.llap.LlapInputFormat org.apache.hadoop.hive.llap.LlapInputSplit 1 +PREHOOK: query: select r1, r2, floor(length(r3)/100000) +from + (select + get_splits( + "select key from srcpart where key % 2 = 0", + 5) as (r1, r2, r3)) t +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +#### A masked pattern was here #### +POSTHOOK: query: select r1, r2, floor(length(r3)/100000) +from + (select + get_splits( + "select key from srcpart where key % 2 = 0", + 5) as (r1, r2, r3)) t +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +#### A masked pattern was here #### +org.apache.hadoop.hive.llap.LlapInputFormat org.apache.hadoop.hive.llap.LlapInputSplit 1 +org.apache.hadoop.hive.llap.LlapInputFormat org.apache.hadoop.hive.llap.LlapInputSplit 1 +org.apache.hadoop.hive.llap.LlapInputFormat org.apache.hadoop.hive.llap.LlapInputSplit 1 +org.apache.hadoop.hive.llap.LlapInputFormat org.apache.hadoop.hive.llap.LlapInputSplit 1 +PREHOOK: query: show tables +PREHOOK: type: SHOWTABLES +PREHOOK: Input: database:default +POSTHOOK: query: show tables +POSTHOOK: type: SHOWTABLES +POSTHOOK: Input: database:default +alltypesorc +cbo_t1 +cbo_t2 +cbo_t3 +lineitem +part +src +src1 +src_cbo +src_json +src_sequencefile +src_thrift +srcbucket +srcbucket2 +srcpart +#### A masked pattern was here #### +PREHOOK: query: select r1, r2 +from + (select + execute_splits( + "select key from srcpart where key % 2 = 0", + 1) as (r1, r2)) t +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +#### A masked pattern was here #### +POSTHOOK: query: select r1, r2 +from + (select + execute_splits( + "select key from srcpart where key % 2 = 0", + 1) as (r1, r2)) t +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +#### A masked pattern was here #### +1 238 +1 86 +1 278 +1 98 +1 484 +1 150 +1 224 +1 66 +1 128 +1 146 +1 406 +1 374 +1 152 +1 82 +1 166 +1 430 +1 252 +1 292 +1 338 +1 446 +1 394 +1 482 +1 174 +1 494 +1 466 +1 208 +1 174 +1 396 +1 162 +1 266 +1 342 +1 0 +1 128 +1 316 +1 302 +1 438 +1 170 +1 20 +1 378 +1 92 +1 72 +1 4 +1 280 +1 208 +1 356 +1 382 +1 498 +1 386 +1 192 +1 286 +1 176 +1 54 +1 138 +1 216 +1 430 +1 278 +1 176 +1 318 +1 332 +1 180 +1 284 +1 12 +1 230 +1 260 +1 404 +1 384 +1 272 +1 138 +1 84 +1 348 +1 466 +1 58 +1 8 +1 230 +1 208 +1 348 +1 24 +1 172 +1 42 +1 158 +1 496 +1 0 +1 322 +1 468 +1 454 +1 100 +1 298 +1 418 +1 96 +1 26 +1 230 +1 120 +1 404 +1 436 +1 156 +1 468 +1 308 +1 196 +1 288 +1 98 +1 282 +1 318 +1 318 +1 470 +1 316 +1 0 +1 490 +1 364 +1 118 +1 134 +1 282 +1 138 +1 238 +1 118 +1 72 +1 90 +1 10 +1 306 +1 224 +1 242 +1 392 +1 272 +1 242 +1 452 +1 226 +1 402 +1 396 +1 58 +1 336 +1 168 +1 34 +1 472 +1 322 +1 498 +1 160 +1 42 +1 430 +1 458 +1 78 +1 76 +1 492 +1 218 +1 228 +1 138 +1 30 +1 64 +1 468 +1 76 +1 74 +1 342 +1 230 +1 368 +1 296 +1 216 +1 344 +1 274 +1 116 +1 256 +1 70 +1 480 +1 288 +1 244 +1 438 +1 128 +1 432 +1 202 +1 316 +1 280 +1 2 +1 80 +1 44 +1 104 +1 466 +1 366 +1 406 +1 190 +1 406 +1 114 +1 258 +1 90 +1 262 +1 348 +1 424 +1 12 +1 396 +1 164 +1 454 +1 478 +1 298 +1 164 +1 424 +1 382 +1 70 +1 480 +1 24 +1 104 +1 70 +1 438 +1 414 +1 200 +1 360 +1 248 +1 444 +1 120 +1 230 +1 478 +1 178 +1 468 +1 310 +1 460 +1 480 +1 136 +1 172 +1 214 +1 462 +1 406 +1 454 +1 384 +1 256 +1 26 +1 134 +1 384 +1 18 +1 462 +1 492 +1 100 +1 298 +1 498 +1 146 +1 458 +1 362 +1 186 +1 348 +1 18 +1 344 +1 84 +1 28 +1 448 +1 152 +1 348 +1 194 +1 414 +1 222 +1 126 +1 90 +1 400 +1 200 +2 238 +2 86 +2 278 +2 98 +2 484 +2 150 +2 224 +2 66 +2 128 +2 146 +2 406 +2 374 +2 152 +2 82 +2 166 +2 430 +2 252 +2 292 +2 338 +2 446 +2 394 +2 482 +2 174 +2 494 +2 466 +2 208 +2 174 +2 396 +2 162 +2 266 +2 342 +2 0 +2 128 +2 316 +2 302 +2 438 +2 170 +2 20 +2 378 +2 92 +2 72 +2 4 +2 280 +2 208 +2 356 +2 382 +2 498 +2 386 +2 192 +2 286 +2 176 +2 54 +2 138 +2 216 +2 430 +2 278 +2 176 +2 318 +2 332 +2 180 +2 284 +2 12 +2 230 +2 260 +2 404 +2 384 +2 272 +2 138 +2 84 +2 348 +2 466 +2 58 +2 8 +2 230 +2 208 +2 348 +2 24 +2 172 +2 42 +2 158 +2 496 +2 0 +2 322 +2 468 +2 454 +2 100 +2 298 +2 418 +2 96 +2 26 +2 230 +2 120 +2 404 +2 436 +2 156 +2 468 +2 308 +2 196 +2 288 +2 98 +2 282 +2 318 +2 318 +2 470 +2 316 +2 0 +2 490 +2 364 +2 118 +2 134 +2 282 +2 138 +2 238 +2 118 +2 72 +2 90 +2 10 +2 306 +2 224 +2 242 +2 392 +2 272 +2 242 +2 452 +2 226 +2 402 +2 396 +2 58 +2 336 +2 168 +2 34 +2 472 +2 322 +2 498 +2 160 +2 42 +2 430 +2 458 +2 78 +2 76 +2 492 +2 218 +2 228 +2 138 +2 30 +2 64 +2 468 +2 76 +2 74 +2 342 +2 230 +2 368 +2 296 +2 216 +2 344 +2 274 +2 116 +2 256 +2 70 +2 480 +2 288 +2 244 +2 438 +2 128 +2 432 +2 202 +2 316 +2 280 +2 2 +2 80 +2 44 +2 104 +2 466 +2 366 +2 406 +2 190 +2 406 +2 114 +2 258 +2 90 +2 262 +2 348 +2 424 +2 12 +2 396 +2 164 +2 454 +2 478 +2 298 +2 164 +2 424 +2 382 +2 70 +2 480 +2 24 +2 104 +2 70 +2 438 +2 414 +2 200 +2 360 +2 248 +2 444 +2 120 +2 230 +2 478 +2 178 +2 468 +2 310 +2 460 +2 480 +2 136 +2 172 +2 214 +2 462 +2 406 +2 454 +2 384 +2 256 +2 26 +2 134 +2 384 +2 18 +2 462 +2 492 +2 100 +2 298 +2 498 +2 146 +2 458 +2 362 +2 186 +2 348 +2 18 +2 344 +2 84 +2 28 +2 448 +2 152 +2 348 +2 194 +2 414 +2 222 +2 126 +2 90 +2 400 +2 200 +3 238 +3 86 +3 278 +3 98 +3 484 +3 150 +3 224 +3 66 +3 128 +3 146 +3 406 +3 374 +3 152 +3 82 +3 166 +3 430 +3 252 +3 292 +3 338 +3 446 +3 394 +3 482 +3 174 +3 494 +3 466 +3 208 +3 174 +3 396 +3 162 +3 266 +3 342 +3 0 +3 128 +3 316 +3 302 +3 438 +3 170 +3 20 +3 378 +3 92 +3 72 +3 4 +3 280 +3 208 +3 356 +3 382 +3 498 +3 386 +3 192 +3 286 +3 176 +3 54 +3 138 +3 216 +3 430 +3 278 +3 176 +3 318 +3 332 +3 180 +3 284 +3 12 +3 230 +3 260 +3 404 +3 384 +3 272 +3 138 +3 84 +3 348 +3 466 +3 58 +3 8 +3 230 +3 208 +3 348 +3 24 +3 172 +3 42 +3 158 +3 496 +3 0 +3 322 +3 468 +3 454 +3 100 +3 298 +3 418 +3 96 +3 26 +3 230 +3 120 +3 404 +3 436 +3 156 +3 468 +3 308 +3 196 +3 288 +3 98 +3 282 +3 318 +3 318 +3 470 +3 316 +3 0 +3 490 +3 364 +3 118 +3 134 +3 282 +3 138 +3 238 +3 118 +3 72 +3 90 +3 10 +3 306 +3 224 +3 242 +3 392 +3 272 +3 242 +3 452 +3 226 +3 402 +3 396 +3 58 +3 336 +3 168 +3 34 +3 472 +3 322 +3 498 +3 160 +3 42 +3 430 +3 458 +3 78 +3 76 +3 492 +3 218 +3 228 +3 138 +3 30 +3 64 +3 468 +3 76 +3 74 +3 342 +3 230 +3 368 +3 296 +3 216 +3 344 +3 274 +3 116 +3 256 +3 70 +3 480 +3 288 +3 244 +3 438 +3 128 +3 432 +3 202 +3 316 +3 280 +3 2 +3 80 +3 44 +3 104 +3 466 +3 366 +3 406 +3 190 +3 406 +3 114 +3 258 +3 90 +3 262 +3 348 +3 424 +3 12 +3 396 +3 164 +3 454 +3 478 +3 298 +3 164 +3 424 +3 382 +3 70 +3 480 +3 24 +3 104 +3 70 +3 438 +3 414 +3 200 +3 360 +3 248 +3 444 +3 120 +3 230 +3 478 +3 178 +3 468 +3 310 +3 460 +3 480 +3 136 +3 172 +3 214 +3 462 +3 406 +3 454 +3 384 +3 256 +3 26 +3 134 +3 384 +3 18 +3 462 +3 492 +3 100 +3 298 +3 498 +3 146 +3 458 +3 362 +3 186 +3 348 +3 18 +3 344 +3 84 +3 28 +3 448 +3 152 +3 348 +3 194 +3 414 +3 222 +3 126 +3 90 +3 400 +3 200 +4 238 +4 86 +4 278 +4 98 +4 484 +4 150 +4 224 +4 66 +4 128 +4 146 +4 406 +4 374 +4 152 +4 82 +4 166 +4 430 +4 252 +4 292 +4 338 +4 446 +4 394 +4 482 +4 174 +4 494 +4 466 +4 208 +4 174 +4 396 +4 162 +4 266 +4 342 +4 0 +4 128 +4 316 +4 302 +4 438 +4 170 +4 20 +4 378 +4 92 +4 72 +4 4 +4 280 +4 208 +4 356 +4 382 +4 498 +4 386 +4 192 +4 286 +4 176 +4 54 +4 138 +4 216 +4 430 +4 278 +4 176 +4 318 +4 332 +4 180 +4 284 +4 12 +4 230 +4 260 +4 404 +4 384 +4 272 +4 138 +4 84 +4 348 +4 466 +4 58 +4 8 +4 230 +4 208 +4 348 +4 24 +4 172 +4 42 +4 158 +4 496 +4 0 +4 322 +4 468 +4 454 +4 100 +4 298 +4 418 +4 96 +4 26 +4 230 +4 120 +4 404 +4 436 +4 156 +4 468 +4 308 +4 196 +4 288 +4 98 +4 282 +4 318 +4 318 +4 470 +4 316 +4 0 +4 490 +4 364 +4 118 +4 134 +4 282 +4 138 +4 238 +4 118 +4 72 +4 90 +4 10 +4 306 +4 224 +4 242 +4 392 +4 272 +4 242 +4 452 +4 226 +4 402 +4 396 +4 58 +4 336 +4 168 +4 34 +4 472 +4 322 +4 498 +4 160 +4 42 +4 430 +4 458 +4 78 +4 76 +4 492 +4 218 +4 228 +4 138 +4 30 +4 64 +4 468 +4 76 +4 74 +4 342 +4 230 +4 368 +4 296 +4 216 +4 344 +4 274 +4 116 +4 256 +4 70 +4 480 +4 288 +4 244 +4 438 +4 128 +4 432 +4 202 +4 316 +4 280 +4 2 +4 80 +4 44 +4 104 +4 466 +4 366 +4 406 +4 190 +4 406 +4 114 +4 258 +4 90 +4 262 +4 348 +4 424 +4 12 +4 396 +4 164 +4 454 +4 478 +4 298 +4 164 +4 424 +4 382 +4 70 +4 480 +4 24 +4 104 +4 70 +4 438 +4 414 +4 200 +4 360 +4 248 +4 444 +4 120 +4 230 +4 478 +4 178 +4 468 +4 310 +4 460 +4 480 +4 136 +4 172 +4 214 +4 462 +4 406 +4 454 +4 384 +4 256 +4 26 +4 134 +4 384 +4 18 +4 462 +4 492 +4 100 +4 298 +4 498 +4 146 +4 458 +4 362 +4 186 +4 348 +4 18 +4 344 +4 84 +4 28 +4 448 +4 152 +4 348 +4 194 +4 414 +4 222 +4 126 +4 90 +4 400 +4 200 +PREHOOK: query: select r1, r2 +from + (select + execute_splits( + "select key from srcpart where key % 2 = 0", + 5) as (r1, r2)) t +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +#### A masked pattern was here #### +POSTHOOK: query: select r1, r2 +from + (select + execute_splits( + "select key from srcpart where key % 2 = 0", + 5) as (r1, r2)) t +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +#### A masked pattern was here #### +1 238 +1 86 +1 278 +1 98 +1 484 +1 150 +1 224 +1 66 +1 128 +1 146 +1 406 +1 374 +1 152 +1 82 +1 166 +1 430 +1 252 +1 292 +1 338 +1 446 +1 394 +1 482 +1 174 +1 494 +1 466 +1 208 +1 174 +1 396 +1 162 +1 266 +1 342 +1 0 +1 128 +1 316 +1 302 +1 438 +1 170 +1 20 +1 378 +1 92 +1 72 +1 4 +1 280 +1 208 +1 356 +1 382 +1 498 +1 386 +1 192 +1 286 +1 176 +1 54 +1 138 +1 216 +1 430 +1 278 +1 176 +1 318 +1 332 +1 180 +1 284 +1 12 +1 230 +1 260 +1 404 +1 384 +1 272 +1 138 +1 84 +1 348 +1 466 +1 58 +1 8 +1 230 +1 208 +1 348 +1 24 +1 172 +1 42 +1 158 +1 496 +1 0 +1 322 +1 468 +1 454 +1 100 +1 298 +1 418 +1 96 +1 26 +1 230 +1 120 +1 404 +1 436 +1 156 +1 468 +1 308 +1 196 +1 288 +1 98 +1 282 +1 318 +1 318 +1 470 +1 316 +1 0 +1 490 +1 364 +1 118 +1 134 +1 282 +1 138 +1 238 +1 118 +1 72 +1 90 +1 10 +1 306 +1 224 +1 242 +1 392 +1 272 +1 242 +1 452 +1 226 +1 402 +1 396 +1 58 +1 336 +1 168 +1 34 +1 472 +1 322 +1 498 +1 160 +1 42 +1 430 +1 458 +1 78 +1 76 +1 492 +1 218 +1 228 +1 138 +1 30 +1 64 +1 468 +1 76 +1 74 +1 342 +1 230 +1 368 +1 296 +1 216 +1 344 +1 274 +1 116 +1 256 +1 70 +1 480 +1 288 +1 244 +1 438 +1 128 +1 432 +1 202 +1 316 +1 280 +1 2 +1 80 +1 44 +1 104 +1 466 +1 366 +1 406 +1 190 +1 406 +1 114 +1 258 +1 90 +1 262 +1 348 +1 424 +1 12 +1 396 +1 164 +1 454 +1 478 +1 298 +1 164 +1 424 +1 382 +1 70 +1 480 +1 24 +1 104 +1 70 +1 438 +1 414 +1 200 +1 360 +1 248 +1 444 +1 120 +1 230 +1 478 +1 178 +1 468 +1 310 +1 460 +1 480 +1 136 +1 172 +1 214 +1 462 +1 406 +1 454 +1 384 +1 256 +1 26 +1 134 +1 384 +1 18 +1 462 +1 492 +1 100 +1 298 +1 498 +1 146 +1 458 +1 362 +1 186 +1 348 +1 18 +1 344 +1 84 +1 28 +1 448 +1 152 +1 348 +1 194 +1 414 +1 222 +1 126 +1 90 +1 400 +1 200 +2 238 +2 86 +2 278 +2 98 +2 484 +2 150 +2 224 +2 66 +2 128 +2 146 +2 406 +2 374 +2 152 +2 82 +2 166 +2 430 +2 252 +2 292 +2 338 +2 446 +2 394 +2 482 +2 174 +2 494 +2 466 +2 208 +2 174 +2 396 +2 162 +2 266 +2 342 +2 0 +2 128 +2 316 +2 302 +2 438 +2 170 +2 20 +2 378 +2 92 +2 72 +2 4 +2 280 +2 208 +2 356 +2 382 +2 498 +2 386 +2 192 +2 286 +2 176 +2 54 +2 138 +2 216 +2 430 +2 278 +2 176 +2 318 +2 332 +2 180 +2 284 +2 12 +2 230 +2 260 +2 404 +2 384 +2 272 +2 138 +2 84 +2 348 +2 466 +2 58 +2 8 +2 230 +2 208 +2 348 +2 24 +2 172 +2 42 +2 158 +2 496 +2 0 +2 322 +2 468 +2 454 +2 100 +2 298 +2 418 +2 96 +2 26 +2 230 +2 120 +2 404 +2 436 +2 156 +2 468 +2 308 +2 196 +2 288 +2 98 +2 282 +2 318 +2 318 +2 470 +2 316 +2 0 +2 490 +2 364 +2 118 +2 134 +2 282 +2 138 +2 238 +2 118 +2 72 +2 90 +2 10 +2 306 +2 224 +2 242 +2 392 +2 272 +2 242 +2 452 +2 226 +2 402 +2 396 +2 58 +2 336 +2 168 +2 34 +2 472 +2 322 +2 498 +2 160 +2 42 +2 430 +2 458 +2 78 +2 76 +2 492 +2 218 +2 228 +2 138 +2 30 +2 64 +2 468 +2 76 +2 74 +2 342 +2 230 +2 368 +2 296 +2 216 +2 344 +2 274 +2 116 +2 256 +2 70 +2 480 +2 288 +2 244 +2 438 +2 128 +2 432 +2 202 +2 316 +2 280 +2 2 +2 80 +2 44 +2 104 +2 466 +2 366 +2 406 +2 190 +2 406 +2 114 +2 258 +2 90 +2 262 +2 348 +2 424 +2 12 +2 396 +2 164 +2 454 +2 478 +2 298 +2 164 +2 424 +2 382 +2 70 +2 480 +2 24 +2 104 +2 70 +2 438 +2 414 +2 200 +2 360 +2 248 +2 444 +2 120 +2 230 +2 478 +2 178 +2 468 +2 310 +2 460 +2 480 +2 136 +2 172 +2 214 +2 462 +2 406 +2 454 +2 384 +2 256 +2 26 +2 134 +2 384 +2 18 +2 462 +2 492 +2 100 +2 298 +2 498 +2 146 +2 458 +2 362 +2 186 +2 348 +2 18 +2 344 +2 84 +2 28 +2 448 +2 152 +2 348 +2 194 +2 414 +2 222 +2 126 +2 90 +2 400 +2 200 +3 238 +3 86 +3 278 +3 98 +3 484 +3 150 +3 224 +3 66 +3 128 +3 146 +3 406 +3 374 +3 152 +3 82 +3 166 +3 430 +3 252 +3 292 +3 338 +3 446 +3 394 +3 482 +3 174 +3 494 +3 466 +3 208 +3 174 +3 396 +3 162 +3 266 +3 342 +3 0 +3 128 +3 316 +3 302 +3 438 +3 170 +3 20 +3 378 +3 92 +3 72 +3 4 +3 280 +3 208 +3 356 +3 382 +3 498 +3 386 +3 192 +3 286 +3 176 +3 54 +3 138 +3 216 +3 430 +3 278 +3 176 +3 318 +3 332 +3 180 +3 284 +3 12 +3 230 +3 260 +3 404 +3 384 +3 272 +3 138 +3 84 +3 348 +3 466 +3 58 +3 8 +3 230 +3 208 +3 348 +3 24 +3 172 +3 42 +3 158 +3 496 +3 0 +3 322 +3 468 +3 454 +3 100 +3 298 +3 418 +3 96 +3 26 +3 230 +3 120 +3 404 +3 436 +3 156 +3 468 +3 308 +3 196 +3 288 +3 98 +3 282 +3 318 +3 318 +3 470 +3 316 +3 0 +3 490 +3 364 +3 118 +3 134 +3 282 +3 138 +3 238 +3 118 +3 72 +3 90 +3 10 +3 306 +3 224 +3 242 +3 392 +3 272 +3 242 +3 452 +3 226 +3 402 +3 396 +3 58 +3 336 +3 168 +3 34 +3 472 +3 322 +3 498 +3 160 +3 42 +3 430 +3 458 +3 78 +3 76 +3 492 +3 218 +3 228 +3 138 +3 30 +3 64 +3 468 +3 76 +3 74 +3 342 +3 230 +3 368 +3 296 +3 216 +3 344 +3 274 +3 116 +3 256 +3 70 +3 480 +3 288 +3 244 +3 438 +3 128 +3 432 +3 202 +3 316 +3 280 +3 2 +3 80 +3 44 +3 104 +3 466 +3 366 +3 406 +3 190 +3 406 +3 114 +3 258 +3 90 +3 262 +3 348 +3 424 +3 12 +3 396 +3 164 +3 454 +3 478 +3 298 +3 164 +3 424 +3 382 +3 70 +3 480 +3 24 +3 104 +3 70 +3 438 +3 414 +3 200 +3 360 +3 248 +3 444 +3 120 +3 230 +3 478 +3 178 +3 468 +3 310 +3 460 +3 480 +3 136 +3 172 +3 214 +3 462 +3 406 +3 454 +3 384 +3 256 +3 26 +3 134 +3 384 +3 18 +3 462 +3 492 +3 100 +3 298 +3 498 +3 146 +3 458 +3 362 +3 186 +3 348 +3 18 +3 344 +3 84 +3 28 +3 448 +3 152 +3 348 +3 194 +3 414 +3 222 +3 126 +3 90 +3 400 +3 200 +4 238 +4 86 +4 278 +4 98 +4 484 +4 150 +4 224 +4 66 +4 128 +4 146 +4 406 +4 374 +4 152 +4 82 +4 166 +4 430 +4 252 +4 292 +4 338 +4 446 +4 394 +4 482 +4 174 +4 494 +4 466 +4 208 +4 174 +4 396 +4 162 +4 266 +4 342 +4 0 +4 128 +4 316 +4 302 +4 438 +4 170 +4 20 +4 378 +4 92 +4 72 +4 4 +4 280 +4 208 +4 356 +4 382 +4 498 +4 386 +4 192 +4 286 +4 176 +4 54 +4 138 +4 216 +4 430 +4 278 +4 176 +4 318 +4 332 +4 180 +4 284 +4 12 +4 230 +4 260 +4 404 +4 384 +4 272 +4 138 +4 84 +4 348 +4 466 +4 58 +4 8 +4 230 +4 208 +4 348 +4 24 +4 172 +4 42 +4 158 +4 496 +4 0 +4 322 +4 468 +4 454 +4 100 +4 298 +4 418 +4 96 +4 26 +4 230 +4 120 +4 404 +4 436 +4 156 +4 468 +4 308 +4 196 +4 288 +4 98 +4 282 +4 318 +4 318 +4 470 +4 316 +4 0 +4 490 +4 364 +4 118 +4 134 +4 282 +4 138 +4 238 +4 118 +4 72 +4 90 +4 10 +4 306 +4 224 +4 242 +4 392 +4 272 +4 242 +4 452 +4 226 +4 402 +4 396 +4 58 +4 336 +4 168 +4 34 +4 472 +4 322 +4 498 +4 160 +4 42 +4 430 +4 458 +4 78 +4 76 +4 492 +4 218 +4 228 +4 138 +4 30 +4 64 +4 468 +4 76 +4 74 +4 342 +4 230 +4 368 +4 296 +4 216 +4 344 +4 274 +4 116 +4 256 +4 70 +4 480 +4 288 +4 244 +4 438 +4 128 +4 432 +4 202 +4 316 +4 280 +4 2 +4 80 +4 44 +4 104 +4 466 +4 366 +4 406 +4 190 +4 406 +4 114 +4 258 +4 90 +4 262 +4 348 +4 424 +4 12 +4 396 +4 164 +4 454 +4 478 +4 298 +4 164 +4 424 +4 382 +4 70 +4 480 +4 24 +4 104 +4 70 +4 438 +4 414 +4 200 +4 360 +4 248 +4 444 +4 120 +4 230 +4 478 +4 178 +4 468 +4 310 +4 460 +4 480 +4 136 +4 172 +4 214 +4 462 +4 406 +4 454 +4 384 +4 256 +4 26 +4 134 +4 384 +4 18 +4 462 +4 492 +4 100 +4 298 +4 498 +4 146 +4 458 +4 362 +4 186 +4 348 +4 18 +4 344 +4 84 +4 28 +4 448 +4 152 +4 348 +4 194 +4 414 +4 222 +4 126 +4 90 +4 400 +4 200 +PREHOOK: query: select count(*) from (select key from srcpart where key % 2 = 0) t +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from (select key from srcpart where key % 2 = 0) t +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +#### A masked pattern was here #### +988