Index: build-common.xml =================================================================== --- build-common.xml (revision 718182) +++ build-common.xml (working copy) @@ -86,6 +86,9 @@ + + + @@ -215,7 +218,7 @@ - + Index: build.xml =================================================================== --- build.xml (revision 718182) +++ build.xml (working copy) @@ -67,6 +67,9 @@ + + + @@ -89,6 +92,9 @@ + + + Index: service/scripts/env.sh =================================================================== --- service/scripts/env.sh (revision 0) +++ service/scripts/env.sh (revision 0) @@ -0,0 +1,3 @@ +export HADOOP_HOME=/Users/michim/Documents/workspace/hadoop/build/hadoop-0.20.0-dev +export HIVE_HOME=/Users/michim/Documents/workspace/hadoop/build/contrib/hive/dist +export JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/1.6.0/Home Index: service/scripts/start_hive_thrift_server.sh =================================================================== --- service/scripts/start_hive_thrift_server.sh (revision 0) +++ service/scripts/start_hive_thrift_server.sh (revision 0) @@ -0,0 +1,50 @@ +#!/bin/sh + +if [ "$HADOOP_HOME" == '' ]; then + echo "Need to set HADOOP_HOME" + exit 1 +fi + +if [ "$HIVE_HOME" == '' ]; then + echo "Need to set HIVE_HOME" + exit 1 +fi + +if [ "$JAVA_HOME" == '' ]; then + echo "Need to set JAVA_HOME" + exit 1 +fi +CLASSPATH= + +# the dist lib libraries +for f in /usr/local/fbprojects/hive.metastore/lib/*.jar ; do + CLASSPATH=$CLASSPATH:$f +done + +# the hadoop libraries +for f in /mnt/hive/stable/cluster/*.jar ; do + CLASSPATH=$CLASSPATH:$f +done + +# the apache libraries +for f in /mnt/hive/stable/cluster/lib/*.jar ; do + CLASSPATH=$CLASSPATH:$f +done + +# for now, the fb_hive libraries +for f in /mnt/hive/stable/lib/hive/*.jar ; do + CLASSPATH=$CLASSPATH:$f +done + +for f in $HADOOP_HOME/lib/*.jar $HADOOP_HOME/*.jar ; do + CLASSPATH=$CLASSPATH:$f +done + +for f in $HIVE_HOME/lib/*.jar ; do + CLASSPATH=$CLASSPATH:$f +done + +CLASSPATH=$CLASSPATH:$HIVE_HOME/conf +echo $CLASSPATH +$JAVA_HOME/bin/java -Dcom.sun.management.jmxremote -Djava.library.path=/mnt/hive/production/cluster/lib/native/Linux-amd64-64/ \ + -cp $CLASSPATH org.apache.hadoop.hive.service.HiveServer 10000 Index: service/scripts/hive.service_daemon =================================================================== --- service/scripts/hive.service_daemon (revision 0) +++ service/scripts/hive.service_daemon (revision 0) @@ -0,0 +1,17 @@ +#!/usr/bin/python + +import os, sys +sys.path.append('/usr/local/fbprojects/fb303/scripts') + +import fb303_scripts + +if (len(sys.argv) > 1): + args = sys.argv[1] +else: + args = '' + +fb303_scripts.daemonize_with_restarts('hive.metastore', + False, + binary_name = 'start_meta_store_thrift_server.sh', + libs = '', + args = args) Index: service/scripts/hive.service_ctrl =================================================================== --- service/scripts/hive.service_ctrl (revision 0) +++ service/scripts/hive.service_ctrl (revision 0) @@ -0,0 +1,31 @@ +#!/usr/bin/env python +# chkconfig: 2345 90 20 +# description: hive metastore + +import sys +import fb303_scripts +from fb303_scripts import fb303_simple_mgmt + +# thrift python packages need to be installed +import thrift +from thrift import protocol, transport +from thrift.transport import TTransport +from thrift.protocol import TBinaryProtocol +from thrift.transport import * + +if (len(sys.argv) > 2): + port = int(sys.argv[2]) +else: + port = 9082 + +if (len(sys.argv) > 1): + retval = fb303_simple_mgmt.service_ctrl('hive.metastore', + port, + sys.argv[1], + binary_name = "start_meta_store_thrift_server.sh", + trans_factory = thrift.transport.TTransport.TBufferedTransportFactory(), + prot_factory = TBinaryProtocol.TBinaryProtocolFactory()) + sys.exit(retval) +else: + print 'metastore_ctrl requires a command: start, stop, restart, reload, force-reload, status, or version' + sys.exit(2) Index: service/src/test/org/apache/hadoop/hive/service/TestHiveServer.java =================================================================== --- service/src/test/org/apache/hadoop/hive/service/TestHiveServer.java (revision 0) +++ service/src/test/org/apache/hadoop/hive/service/TestHiveServer.java (revision 0) @@ -0,0 +1,113 @@ +package org.apache.hadoop.hive.service; + +import java.util.List; +import java.util.Iterator; +import org.apache.hadoop.fs.Path; +import junit.framework.TestCase; +import org.apache.hadoop.hive.service.HiveInterface; +import org.apache.hadoop.hive.service.HiveClient; +import org.apache.hadoop.hive.service.HiveServer; +import com.facebook.thrift.protocol.TProtocol; +import com.facebook.thrift.protocol.TBinaryProtocol; +import com.facebook.thrift.transport.TSocket; +import com.facebook.thrift.transport.TTransport; +import org.apache.hadoop.hive.conf.HiveConf; + +public class TestHiveServer extends TestCase { + + private HiveInterface client; + private final static String host = "localhost"; + private final static int port = 10000; + private Path dataFilePath; + + private static String tableName = "testhivedrivertable"; + private HiveConf conf; + + public TestHiveServer(String name) { + super(name); + conf = new HiveConf(TestHiveServer.class); + String dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); + dataFilePath = new Path(dataFileDir, "kv1.txt"); + } + + protected void setUp() throws Exception { + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + } + + public void testEmbeddedExecute() throws Exception { + client = new HiveServer.HiveServerHandler(); + try { + client.execute("drop table " + tableName); + } catch (Exception ex) { + } + + client.execute("create table " + tableName + " (num int)"); + client.execute("load data local inpath '" + dataFilePath.toString() + "' into table " + tableName); + client.execute("select count(1) from " + tableName); + String row = client.fetchOne(); + assertEquals(row, "500"); + String schema = client.getSchema(); + assertEquals(schema, "struct result { 0: bigint, }"); + client.execute("drop table " + tableName); + } + + public void notestExecute() throws Exception { + TTransport transport = new TSocket(host, port); + TProtocol protocol = new TBinaryProtocol(transport); + client = new HiveClient(protocol); + transport.open(); + try { + client.execute("drop table " + tableName); + } catch (Exception ex) { + } + + client.execute("create table " + tableName + " (num int)"); + client.execute("load data local inpath '" + dataFilePath.toString() + "' into table " + tableName); + client.execute("select count(1) from " + tableName); + String row = client.fetchOne(); + assertEquals(row, "500"); + client.execute("drop table " + tableName); + transport.close(); + } + + /** + * Test metastore call + */ + public void testEmbeddedMetastore() throws Exception { + client = new HiveServer.HiveServerHandler(); + try { + client.execute("drop table " + tableName); + } catch (Exception ex) { + } + + client.execute("create table " + tableName + " (num int)"); + List tabs = client.get_tables("default", tableName); + assertEquals(tabs.get(0), tableName); + client.execute("drop table " + tableName); + } + + /** + * Test metastore call + */ + public void notestMetastore() throws Exception { + TTransport transport = new TSocket(host, port); + TProtocol protocol = new TBinaryProtocol(transport); + client = new HiveClient(protocol); + transport.open(); + try { + client.execute("drop table " + tableName); + } catch (Exception ex) { + } + + client.execute("create table " + tableName + " (num int)"); + List tabs = client.get_tables("default", tableName); + assertEquals(tabs.get(0), tableName); + client.execute("drop table " + tableName); + transport.close(); + } + +} Index: service/src/java/org/apache/hadoop/hive/service/HiveServer.java =================================================================== --- service/src/java/org/apache/hadoop/hive/service/HiveServer.java (revision 0) +++ service/src/java/org/apache/hadoop/hive/service/HiveServer.java (revision 0) @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.service; + + +import java.util.List; +import java.util.Map; +import java.util.Vector; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.service.ThriftHive; +import org.apache.hadoop.hive.service.ThriftHive.*; +import org.apache.hadoop.hive.service.HiveServerException; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; + +import com.facebook.fb303.FacebookBase; +import com.facebook.fb303.FacebookService; +import com.facebook.fb303.fb_status; +import com.facebook.thrift.TException; +import com.facebook.thrift.protocol.TBinaryProtocol; +import com.facebook.thrift.server.TServer; +import com.facebook.thrift.server.TThreadPoolServer; +import com.facebook.thrift.transport.TServerSocket; +import com.facebook.thrift.transport.TServerTransport; +import com.facebook.thrift.transport.TTransportFactory; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.*; + +public class HiveServer extends ThriftHive { + private final static String VERSION = "0"; + + public static class HiveServerHandler extends HiveMetaStore.HMSHandler implements HiveInterface { + + private Driver driver; + private SessionState session; + public static final Log LOG = LogFactory.getLog(HiveServer.class.getName()); + + public HiveServerHandler() throws MetaException { + super(HiveServer.class.getName()); + session = new SessionState(new HiveConf(SessionState.class)); + SessionState.start(session); + HiveConf conf = session.get().getConf(); + conf.setVar(HiveConf.ConfVars.HADOOPBIN, System.getenv("HADOOP_HOME")+"/bin/hadoop"); + session.in = null; + session.out = null; + session.err = null; + driver = new Driver(); + } + + public void execute(String query) throws HiveServerException, TException { + int rc = 0; + // TODO: driver.run should either return int or throw exception, not both. + try { + rc = driver.run(query); + } catch (Exception e) { + throw new HiveServerException(); + } + if (rc != 0) { + throw new HiveServerException(); + } + } + + // TODO construct DynamicSerDe DDL + public String getSchema() throws HiveServerException, TException { + Vector colInfos = driver.getResultColumnInfos(); + if (colInfos == null) { + return ""; + } + String schema = "struct result { "; + for (ColumnInfo colInfo: colInfos) { + schema += colInfo + ", "; + } + schema += "}"; + return schema; + } + + // TODO keep state across calls + public String fetchOne() throws HiveServerException, TException { + Vector row = new Vector(); + if (!driver.getResults(row)) { + row.clear(); + } + return row.get(0); + } + + // TODO keep state across calls + public List fetchN(int numRows) throws HiveServerException, TException { + Vector row = new Vector(); + if (!driver.getResults(row)) { + row.clear(); + } + return row; + } + + // TODO keep state across calls + public List fetchAll() throws HiveServerException, TException { + Vector row = new Vector(); + if (!driver.getResults(row)) { + row.clear(); + } + return row; + } + + @Override + public int getStatus() { + return 0; + } + + @Override + public String getVersion() { + return VERSION; + } + } + + public static void main(String[] args) { + try { + int port = Integer.valueOf(args[0]).intValue(); + TServerTransport serverTransport = new TServerSocket(port); + Iface handler = new HiveServerHandler(); + FacebookService.Processor processor = new ThriftHive.Processor(handler); + TThreadPoolServer.Options options = new TThreadPoolServer.Options(); + TServer server = new TThreadPoolServer(processor, serverTransport, + new TTransportFactory(), new TTransportFactory(), + new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), options); + HiveServerHandler.LOG.info("Started the new hive server on port " + port); + server.serve(); + } catch (Exception x) { + x.printStackTrace(); + } + } +} Index: service/src/java/org/apache/hadoop/hive/service/HiveClient.java =================================================================== --- service/src/java/org/apache/hadoop/hive/service/HiveClient.java (revision 0) +++ service/src/java/org/apache/hadoop/hive/service/HiveClient.java (revision 0) @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.service; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.service.ThriftHive; +import org.apache.hadoop.hive.service.ThriftHive.*; +import org.apache.hadoop.hive.service.HiveServerException; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; + +import com.facebook.fb303.FacebookBase; +import com.facebook.fb303.FacebookService; +import com.facebook.fb303.fb_status; +import com.facebook.thrift.TException; +import com.facebook.thrift.protocol.TProtocol; + +import org.apache.hadoop.hive.metastore.api.*; + +public class HiveClient extends ThriftHive.Client implements HiveInterface { + public HiveClient(TProtocol prot) { + super(prot, prot); + } +} Index: service/src/java/org/apache/hadoop/hive/service/HiveInterface.java =================================================================== --- service/src/java/org/apache/hadoop/hive/service/HiveInterface.java (revision 0) +++ service/src/java/org/apache/hadoop/hive/service/HiveInterface.java (revision 0) @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.service; + +public interface HiveInterface extends ThriftHive.Iface, org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface { +} Index: service/build.xml =================================================================== --- service/build.xml (revision 0) +++ service/build.xml (revision 0) @@ -0,0 +1,51 @@ + + + + + + + + + + Executing thrift (which needs to be in your path) to build java metastore APIs.... + + + + + + + + + + + + + + + + + + + Index: service/if/hive_service.thrift =================================================================== --- service/if/hive_service.thrift (revision 0) +++ service/if/hive_service.thrift (revision 0) @@ -0,0 +1,26 @@ +#!/usr/local/bin/thrift -java +# +# Thrift Service that the hive service is built on +# + +# +# TODO: include/thrift is shared among different components. It +# should not be under metastore. + +include "thrift/fb303/if/fb303.thrift" +include "metastore/if/hive_metastore.thrift" + +namespace java org.apache.hadoop.hive.service +namespace php hive + +exception HiveServerException { + string message +} + +service ThriftHive extends hive_metastore.ThriftHiveMetastore { + void execute(1:string query) throws(1:HiveServerException ex) + string fetchOne() throws(1:HiveServerException ex) + list fetchN(1:i32 numRows) throws(1:HiveServerException ex) + list fetchAll() throws(1:HiveServerException ex) + string getSchema() throws(1:HiveServerException ex) +} Index: data/conf/hive-site.xml =================================================================== --- data/conf/hive-site.xml (revision 718182) +++ data/conf/hive-site.xml (working copy) @@ -97,7 +97,7 @@ test.data.files - ${user.dir}/../build/ql/test/data/files + ${user.dir}/../data/files Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java (revision 718182) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java (working copy) @@ -99,6 +99,10 @@ assert false; return 0; } + + public Vector getColumnInfos() { + return work.getColumnInfos(); + } /** * A cache of InputFormat instances. Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 718182) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -1867,7 +1867,7 @@ } this.loadFileWork.add(new loadFileDesc(queryTmpdir, dest_path, - (dest_type.intValue() == QBMetaData.DEST_DFS_FILE), cols)); + (dest_type.intValue() == QBMetaData.DEST_DFS_FILE), cols, colInfos)); table_desc = PlanUtils.getDefaultTableDesc(Integer.toString(Utilities.ctrlaCode), cols); break; @@ -2860,7 +2860,7 @@ Table tab = ((Map.Entry)iter.next()).getValue(); if (!tab.isPartitioned()) { if (qb.getParseInfo().getDestToWhereExpr().isEmpty()) - fetch = new fetchWork(tab.getPath(), Utilities.getTableDesc(tab), qb.getParseInfo().getOuterQueryLimit()); + fetch = new fetchWork(tab.getPath(), Utilities.getTableDesc(tab), qb.getParseInfo().getOuterQueryLimit(), null); } else { if (aliasToPruner.size() == 1) { @@ -2906,7 +2906,8 @@ Utilities.makeProperties( org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "" + Utilities.ctrlaCode, "columns", cols)), - qb.getParseInfo().getOuterQueryLimit()); + qb.getParseInfo().getOuterQueryLimit(), + loadFileWork.get(0).getColumnInfos()); fetchTask = TaskFactory.get(fetch, this.conf); setFetchTask(fetchTask); Index: ql/src/java/org/apache/hadoop/hive/ql/plan/loadFileDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/loadFileDesc.java (revision 718182) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/loadFileDesc.java (working copy) @@ -20,8 +20,10 @@ import java.io.Serializable; import java.util.List; +import java.util.Vector; import org.apache.hadoop.hive.ql.plan.loadDesc; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; public class loadFileDesc extends loadDesc implements Serializable { private static final long serialVersionUID = 1L; @@ -29,18 +31,21 @@ private boolean isDfsDir; // list of columns, comma separated private String columns; + private Vector colInfos; public loadFileDesc() { } public loadFileDesc( final String sourceDir, final String targetDir, final boolean isDfsDir, - final String columns) { + final String columns, + final Vector colInfos) { super(sourceDir); this.targetDir = targetDir; this.isDfsDir = isDfsDir; this.columns = columns; + this.colInfos = colInfos; } @explain(displayName="destination") @@ -66,6 +71,14 @@ return columns; } + public Vector getColumnInfos() { + return colInfos; + } + + public void setColumnInfos(Vector colInfos) { + this.colInfos = colInfos; + } + /** * @param columns the columns to set */ Index: ql/src/java/org/apache/hadoop/hive/ql/plan/fetchWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/fetchWork.java (revision 718182) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/fetchWork.java (working copy) @@ -20,9 +20,11 @@ import java.io.Serializable; import java.util.List; +import java.util.Vector; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.plan.tableDesc; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; @explain(displayName="Fetch Operator") public class fetchWork implements Serializable { @@ -33,15 +35,17 @@ private List partDir; private List partDesc; + private Vector colInfos; private int limit; public fetchWork() { } - public fetchWork(Path tblDir, tableDesc tblDesc, int limit) { + public fetchWork(Path tblDir, tableDesc tblDesc, int limit, Vector colInfos) { this.tblDir = tblDir; this.tblDesc = tblDesc; this.limit = limit; + this.colInfos = colInfos; } public fetchWork(List partDir, List partDesc, int limit) { @@ -49,7 +53,15 @@ this.partDesc = partDesc; this.limit = limit; } - + + public Vector getColumnInfos() { + return colInfos; + } + + public void setColumnInfos(Vector colInfos) { + this.colInfos = colInfos; + } + /** * @return the tblDir */ Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 718182) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -34,6 +34,8 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.FetchTask; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.serde.ByteStream; @@ -68,6 +70,18 @@ return jobs; } + public Vector getResultColumnInfos() { + if (sem != null && sem.getFetchTask() != null) { + if (!sem.getFetchTaskInit()) { + sem.setFetchTaskInit(true); + sem.getFetchTask().initialize(conf); + } + FetchTask ft = (FetchTask)sem.getFetchTask(); + return ft.getColumnInfos(); + } + return null; + } + public boolean hasReduceTasks(List> tasks) { if (tasks == null) return false;