Index: build.xml
===================================================================
--- build.xml (revision 1069164)
+++ build.xml (working copy)
@@ -119,7 +119,7 @@
-
+
@@ -130,7 +130,7 @@
-
+
Index: service/src/java/org/apache/hadoop/hive/service/HiveServer.java
===================================================================
--- service/src/java/org/apache/hadoop/hive/service/HiveServer.java (revision 1069164)
+++ service/src/java/org/apache/hadoop/hive/service/HiveServer.java (working copy)
@@ -18,11 +18,17 @@
package org.apache.hadoop.hive.service;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
-import com.facebook.fb303.fb_status;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -48,6 +54,8 @@
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportFactory;
+import com.facebook.fb303.fb_status;
+
/**
* Thrift Hive Server Implementation.
*/
@@ -63,8 +71,14 @@
/**
* Hive server uses org.apache.hadoop.hive.ql.Driver for run() and
* getResults() methods.
+ * It is the instance of the last Hive query.
*/
- private final Driver driver;
+ private Driver driver;
+ /**
+ * For processors other than Hive queries (Driver), they output to session.out (a temp file)
+ * first and the fetchOne/fetchN/fetchAll functions get the output from pipeIn.
+ */
+ private BufferedReader pipeIn;
/**
* Flag that indicates whether the last executed command was a Hive query.
@@ -80,14 +94,35 @@
super(HiveServer.class.getName());
isHiveQuery = false;
+ driver = null;
SessionState session = new SessionState(new HiveConf(SessionState.class));
SessionState.start(session);
- session.in = null;
- session.out = null;
- session.err = null;
- driver = new Driver();
+ setupSessionIO(session);
}
+ private void setupSessionIO(SessionState session) {
+ try {
+ LOG.info("Putting temp output to file " + session.getTmpOutputFile().toString());
+ session.in = null; // hive server's session input stream is not used
+ // open a per-session file in auto-flush mode for writing temp results
+ session.out = new PrintStream(new FileOutputStream(session.getTmpOutputFile()), true, "UTF-8");
+ // TODO: for hadoop jobs, progress is printed out to session.err,
+ // we should find a way to feed back job progress to client
+ session.err = new PrintStream(System.err, true, "UTF-8");
+ } catch (IOException e) {
+ LOG.error("Error in creating temp output file ", e);
+ try {
+ session.in = null;
+ session.out = new PrintStream(System.out, true, "UTF-8");
+ session.err = new PrintStream(System.err, true, "UTF-8");
+ } catch (UnsupportedEncodingException ee) {
+ ee.printStackTrace();
+ session.out = null;
+ session.err = null;
+ }
+ }
+ }
+
/**
* Executes a query.
*
@@ -96,7 +131,7 @@
*/
public void execute(String cmd) throws HiveServerException, TException {
HiveServerHandler.LOG.info("Running the query: " + cmd);
- SessionState.get();
+ SessionState session = SessionState.get();
String cmd_trimmed = cmd.trim();
String[] tokens = cmd_trimmed.split("\\s");
@@ -111,11 +146,14 @@
CommandProcessorResponse response = null;
if (proc != null) {
if (proc instanceof Driver) {
- ((Driver)proc).destroy();
isHiveQuery = true;
+ driver = (Driver) proc;
response = driver.run(cmd);
} else {
isHiveQuery = false;
+ driver = null;
+ // need to reset output for each non-Hive query
+ setupSessionIO(session);
response = proc.run(cmd_1);
}
@@ -126,6 +164,7 @@
} catch (Exception e) {
HiveServerException ex = new HiveServerException();
ex.setMessage("Error running query: " + e.toString());
+ ex.setErrorCode(ret == 0? -10000: ret);
throw ex;
}
@@ -136,13 +175,32 @@
}
/**
+ * Should be called by the client at the end of a session.
+ */
+ public void clean() {
+ if (driver != null) {
+ driver.close();
+ driver.destroy();
+ }
+
+ SessionState session = SessionState.get();
+ if (session.getTmpOutputFile() != null) {
+ session.getTmpOutputFile().delete();
+ }
+ pipeIn = null;
+ }
+
+ /**
* Return the status information about the Map-Reduce cluster.
*/
public HiveClusterStatus getClusterStatus() throws HiveServerException,
TException {
HiveClusterStatus hcs;
try {
- ClusterStatus cs = driver.getClusterStatus();
+ Driver drv = new Driver();
+ drv.init();
+
+ ClusterStatus cs = drv.getClusterStatus();
JobTracker.State jbs = cs.getJobTrackerState();
// Convert the ClusterStatus to its Thrift equivalent: HiveClusterStatus
@@ -181,6 +239,8 @@
return new Schema();
}
+ assert driver != null: "getSchema() is called on a Hive query and driver is NULL.";
+
try {
Schema schema = driver.getSchema();
if (schema == null) {
@@ -206,6 +266,8 @@
return new Schema();
}
+ assert driver != null: "getThriftSchema() is called on a Hive query and driver is NULL.";
+
try {
Schema schema = driver.getThriftSchema();
if (schema == null) {
@@ -222,6 +284,7 @@
}
}
+
/**
* Fetches the next row in a query result set.
*
@@ -231,9 +294,17 @@
public String fetchOne() throws HiveServerException, TException {
if (!isHiveQuery) {
// Return no results if the last command was not a Hive query
- return "";
+ List results = new ArrayList(1);
+ readResults(results, 1);
+ if (results.size() > 0) {
+ return results.get(0);
+ } else { // throw an EOF exception
+ throw new HiveServerException("OK", 0, "");
+ }
}
+ assert driver != null: "fetchOne() is called on a Hive query and driver is NULL.";
+
ArrayList result = new ArrayList();
driver.setMaxRows(1);
try {
@@ -243,7 +314,8 @@
// TODO: Cannot return null here because thrift cannot handle nulls
// TODO: Returning empty string for now. Need to figure out how to
// TODO: return null in some other way
- return "";
+ throw new HiveServerException("OK", 0, "");
+ // return "";
} catch (IOException e) {
HiveServerException ex = new HiveServerException();
ex.setMessage(e.getMessage());
@@ -251,7 +323,58 @@
}
}
+ private void cleanTmpFile() {
+ if (pipeIn != null) {
+ long len = 0L, pos = 0L;
+ SessionState session = SessionState.get();
+ File tmp = session.getTmpOutputFile();
+ tmp.delete();
+ pipeIn = null;
+ }
+ }
+
/**
+ * Reads the temporary results for non-Hive (non-Driver) commands to the
+ * resulting List of strings.
+ * @param results list of strings containing the results
+ * @param nLines number of lines read at once. If it is <= 0, then read all lines.
+ */
+ private void readResults(List results, int nLines) {
+
+ if (pipeIn == null) {
+ SessionState session = SessionState.get();
+ File tmp = session.getTmpOutputFile();
+ try {
+ pipeIn = new BufferedReader(new FileReader(tmp));
+ } catch (FileNotFoundException e) {
+ LOG.error("File " + tmp + " not found. ", e);
+ return;
+ }
+ }
+
+ boolean readAll = false;
+
+ for (int i = 0; i < nLines || nLines <= 0; ++i) {
+ try {
+ String line = pipeIn.readLine();
+ if (line == null) {
+ // reached the end of the result file
+ readAll = true;
+ break;
+ } else {
+ results.add(line);
+ }
+ } catch (IOException e) {
+ LOG.error("Reading temp results encountered an exception: ", e);
+ readAll = true;
+ }
+ }
+ if (readAll) {
+ cleanTmpFile();
+ }
+ }
+
+ /**
* Fetches numRows rows.
*
* @param numRows
@@ -270,12 +393,16 @@
ex.setMessage("Invalid argument for number of rows: " + numRows);
throw ex;
}
+
+ ArrayList result = new ArrayList();
+
if (!isHiveQuery) {
- // Return no results if the last command was not a Hive query
- return new ArrayList();
+ readResults(result, numRows);
+ return result;
}
- ArrayList result = new ArrayList();
+ assert driver != null: "fetchN() is called on a Hive query and driver is NULL.";
+
driver.setMaxRows(numRows);
try {
driver.getResults(result);
@@ -298,13 +425,16 @@
* in the client.
*/
public List fetchAll() throws HiveServerException, TException {
+
+ ArrayList rows = new ArrayList();
+ ArrayList result = new ArrayList();
+
if (!isHiveQuery) {
- // Return no results if the last command was not a Hive query
- return new ArrayList();
+ // Return all results if numRows <= 0
+ readResults(result, 0);
+ return result;
}
- ArrayList rows = new ArrayList();
- ArrayList result = new ArrayList();
try {
while (driver.getResults(result)) {
rows.addAll(result);
@@ -337,6 +467,13 @@
@Override
public QueryPlan getQueryPlan() throws HiveServerException, TException {
QueryPlan qp = new QueryPlan();
+
+ if (!isHiveQuery) {
+ return qp;
+ }
+
+ assert driver != null: "getQueryPlan() is called on a Hive query and driver is NULL.";
+
// TODO for now only return one query at a time
// going forward, all queries associated with a single statement
// will be returned in a single QueryPlan
Index: service/src/gen/thrift/gen-py/hive_service/ThriftHive.py
===================================================================
--- service/src/gen/thrift/gen-py/hive_service/ThriftHive.py (revision 1069164)
+++ service/src/gen/thrift/gen-py/hive_service/ThriftHive.py (working copy)
@@ -49,7 +49,10 @@
def getQueryPlan(self, ):
pass
+ def clean(self, ):
+ pass
+
class Client(hive_metastore.ThriftHiveMetastore.Client, Iface):
def __init__(self, iprot, oprot=None):
hive_metastore.ThriftHiveMetastore.Client.__init__(self, iprot, oprot)
@@ -278,7 +281,30 @@
raise result.ex
raise TApplicationException(TApplicationException.MISSING_RESULT, "getQueryPlan failed: unknown result");
+ def clean(self, ):
+ self.send_clean()
+ self.recv_clean()
+ def send_clean(self, ):
+ self._oprot.writeMessageBegin('clean', TMessageType.CALL, self._seqid)
+ args = clean_args()
+ args.write(self._oprot)
+ self._oprot.writeMessageEnd()
+ self._oprot.trans.flush()
+
+ def recv_clean(self, ):
+ (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ if mtype == TMessageType.EXCEPTION:
+ x = TApplicationException()
+ x.read(self._iprot)
+ self._iprot.readMessageEnd()
+ raise x
+ result = clean_result()
+ result.read(self._iprot)
+ self._iprot.readMessageEnd()
+ return
+
+
class Processor(hive_metastore.ThriftHiveMetastore.Processor, Iface, TProcessor):
def __init__(self, handler):
hive_metastore.ThriftHiveMetastore.Processor.__init__(self, handler)
@@ -290,6 +316,7 @@
self._processMap["getThriftSchema"] = Processor.process_getThriftSchema
self._processMap["getClusterStatus"] = Processor.process_getClusterStatus
self._processMap["getQueryPlan"] = Processor.process_getQueryPlan
+ self._processMap["clean"] = Processor.process_clean
def process(self, iprot, oprot):
(name, type, seqid) = iprot.readMessageBegin()
@@ -418,7 +445,18 @@
oprot.writeMessageEnd()
oprot.trans.flush()
+ def process_clean(self, seqid, iprot, oprot):
+ args = clean_args()
+ args.read(iprot)
+ iprot.readMessageEnd()
+ result = clean_result()
+ self._handler.clean()
+ oprot.writeMessageBegin("clean", TMessageType.REPLY, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
# HELPER FUNCTIONS AND STRUCTURES
class execute_args:
@@ -1361,3 +1399,85 @@
def __ne__(self, other):
return not (self == other)
+
+class clean_args:
+
+ thrift_spec = (
+ )
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('clean_args')
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+ def validate(self):
+ return
+
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class clean_result:
+
+ thrift_spec = (
+ )
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('clean_result')
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+ def validate(self):
+ return
+
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
Index: service/src/gen/thrift/gen-py/hive_service/ThriftHive-remote
===================================================================
--- service/src/gen/thrift/gen-py/hive_service/ThriftHive-remote (revision 1069164)
+++ service/src/gen/thrift/gen-py/hive_service/ThriftHive-remote (working copy)
@@ -29,6 +29,7 @@
print ' Schema getThriftSchema()'
print ' HiveClusterStatus getClusterStatus()'
print ' QueryPlan getQueryPlan()'
+ print ' void clean()'
print ''
sys.exit(0)
@@ -127,6 +128,12 @@
sys.exit(1)
pp.pprint(client.getQueryPlan())
+elif cmd == 'clean':
+ if len(args) != 0:
+ print 'clean requires 0 args'
+ sys.exit(1)
+ pp.pprint(client.clean())
+
else:
print 'Unrecognized method %s' % cmd
sys.exit(1)
Index: service/src/gen/thrift/gen-cpp/ThriftHive.h
===================================================================
--- service/src/gen/thrift/gen-cpp/ThriftHive.h (revision 1069164)
+++ service/src/gen/thrift/gen-cpp/ThriftHive.h (working copy)
@@ -23,6 +23,7 @@
virtual void getThriftSchema(Apache::Hadoop::Hive::Schema& _return) = 0;
virtual void getClusterStatus(HiveClusterStatus& _return) = 0;
virtual void getQueryPlan(Apache::Hadoop::Hive::QueryPlan& _return) = 0;
+ virtual void clean() = 0;
};
class ThriftHiveNull : virtual public ThriftHiveIf , virtual public Apache::Hadoop::Hive::ThriftHiveMetastoreNull {
@@ -52,6 +53,9 @@
void getQueryPlan(Apache::Hadoop::Hive::QueryPlan& /* _return */) {
return;
}
+ void clean() {
+ return;
+ }
};
typedef struct _ThriftHive_execute_args__isset {
@@ -836,6 +840,80 @@
};
+
+class ThriftHive_clean_args {
+ public:
+
+ ThriftHive_clean_args() {
+ }
+
+ virtual ~ThriftHive_clean_args() throw() {}
+
+
+ bool operator == (const ThriftHive_clean_args & /* rhs */) const
+ {
+ return true;
+ }
+ bool operator != (const ThriftHive_clean_args &rhs) const {
+ return !(*this == rhs);
+ }
+
+ bool operator < (const ThriftHive_clean_args & ) const;
+
+ uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
+ uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
+
+};
+
+
+class ThriftHive_clean_pargs {
+ public:
+
+
+ virtual ~ThriftHive_clean_pargs() throw() {}
+
+
+ uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
+
+};
+
+
+class ThriftHive_clean_result {
+ public:
+
+ ThriftHive_clean_result() {
+ }
+
+ virtual ~ThriftHive_clean_result() throw() {}
+
+
+ bool operator == (const ThriftHive_clean_result & /* rhs */) const
+ {
+ return true;
+ }
+ bool operator != (const ThriftHive_clean_result &rhs) const {
+ return !(*this == rhs);
+ }
+
+ bool operator < (const ThriftHive_clean_result & ) const;
+
+ uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
+ uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
+
+};
+
+
+class ThriftHive_clean_presult {
+ public:
+
+
+ virtual ~ThriftHive_clean_presult() throw() {}
+
+
+ uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
+
+};
+
class ThriftHiveClient : virtual public ThriftHiveIf, public Apache::Hadoop::Hive::ThriftHiveMetastoreClient {
public:
ThriftHiveClient(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> prot) :
@@ -872,6 +950,9 @@
void getQueryPlan(Apache::Hadoop::Hive::QueryPlan& _return);
void send_getQueryPlan();
void recv_getQueryPlan(Apache::Hadoop::Hive::QueryPlan& _return);
+ void clean();
+ void send_clean();
+ void recv_clean();
};
class ThriftHiveProcessor : virtual public ::apache::thrift::TProcessor, public Apache::Hadoop::Hive::ThriftHiveMetastoreProcessor {
@@ -888,6 +969,7 @@
void process_getThriftSchema(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot);
void process_getClusterStatus(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot);
void process_getQueryPlan(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot);
+ void process_clean(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot);
public:
ThriftHiveProcessor(boost::shared_ptr iface) :
Apache::Hadoop::Hive::ThriftHiveMetastoreProcessor(iface),
@@ -900,6 +982,7 @@
processMap_["getThriftSchema"] = &ThriftHiveProcessor::process_getThriftSchema;
processMap_["getClusterStatus"] = &ThriftHiveProcessor::process_getClusterStatus;
processMap_["getQueryPlan"] = &ThriftHiveProcessor::process_getQueryPlan;
+ processMap_["clean"] = &ThriftHiveProcessor::process_clean;
}
virtual bool process(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot);
@@ -1014,6 +1097,13 @@
}
}
+ void clean() {
+ uint32_t sz = ifaces_.size();
+ for (uint32_t i = 0; i < sz; ++i) {
+ ifaces_[i]->clean();
+ }
+ }
+
};
}}} // namespace
Index: service/src/gen/thrift/gen-cpp/ThriftHive_server.skeleton.cpp
===================================================================
--- service/src/gen/thrift/gen-cpp/ThriftHive_server.skeleton.cpp (revision 1069164)
+++ service/src/gen/thrift/gen-cpp/ThriftHive_server.skeleton.cpp (working copy)
@@ -62,6 +62,11 @@
printf("getQueryPlan\n");
}
+ void clean() {
+ // Your implementation goes here
+ printf("clean\n");
+ }
+
};
int main(int argc, char **argv) {
Index: service/src/gen/thrift/gen-cpp/ThriftHive.cpp
===================================================================
--- service/src/gen/thrift/gen-cpp/ThriftHive.cpp (revision 1069164)
+++ service/src/gen/thrift/gen-cpp/ThriftHive.cpp (working copy)
@@ -1391,6 +1391,129 @@
return xfer;
}
+uint32_t ThriftHive_clean_args::read(::apache::thrift::protocol::TProtocol* iprot) {
+
+ uint32_t xfer = 0;
+ std::string fname;
+ ::apache::thrift::protocol::TType ftype;
+ int16_t fid;
+
+ xfer += iprot->readStructBegin(fname);
+
+ using ::apache::thrift::protocol::TProtocolException;
+
+
+ while (true)
+ {
+ xfer += iprot->readFieldBegin(fname, ftype, fid);
+ if (ftype == ::apache::thrift::protocol::T_STOP) {
+ break;
+ }
+ switch (fid)
+ {
+ default:
+ xfer += iprot->skip(ftype);
+ break;
+ }
+ xfer += iprot->readFieldEnd();
+ }
+
+ xfer += iprot->readStructEnd();
+
+ return xfer;
+}
+
+uint32_t ThriftHive_clean_args::write(::apache::thrift::protocol::TProtocol* oprot) const {
+ uint32_t xfer = 0;
+ xfer += oprot->writeStructBegin("ThriftHive_clean_args");
+ xfer += oprot->writeFieldStop();
+ xfer += oprot->writeStructEnd();
+ return xfer;
+}
+
+uint32_t ThriftHive_clean_pargs::write(::apache::thrift::protocol::TProtocol* oprot) const {
+ uint32_t xfer = 0;
+ xfer += oprot->writeStructBegin("ThriftHive_clean_pargs");
+ xfer += oprot->writeFieldStop();
+ xfer += oprot->writeStructEnd();
+ return xfer;
+}
+
+uint32_t ThriftHive_clean_result::read(::apache::thrift::protocol::TProtocol* iprot) {
+
+ uint32_t xfer = 0;
+ std::string fname;
+ ::apache::thrift::protocol::TType ftype;
+ int16_t fid;
+
+ xfer += iprot->readStructBegin(fname);
+
+ using ::apache::thrift::protocol::TProtocolException;
+
+
+ while (true)
+ {
+ xfer += iprot->readFieldBegin(fname, ftype, fid);
+ if (ftype == ::apache::thrift::protocol::T_STOP) {
+ break;
+ }
+ switch (fid)
+ {
+ default:
+ xfer += iprot->skip(ftype);
+ break;
+ }
+ xfer += iprot->readFieldEnd();
+ }
+
+ xfer += iprot->readStructEnd();
+
+ return xfer;
+}
+
+uint32_t ThriftHive_clean_result::write(::apache::thrift::protocol::TProtocol* oprot) const {
+
+ uint32_t xfer = 0;
+
+ xfer += oprot->writeStructBegin("ThriftHive_clean_result");
+
+ xfer += oprot->writeFieldStop();
+ xfer += oprot->writeStructEnd();
+ return xfer;
+}
+
+uint32_t ThriftHive_clean_presult::read(::apache::thrift::protocol::TProtocol* iprot) {
+
+ uint32_t xfer = 0;
+ std::string fname;
+ ::apache::thrift::protocol::TType ftype;
+ int16_t fid;
+
+ xfer += iprot->readStructBegin(fname);
+
+ using ::apache::thrift::protocol::TProtocolException;
+
+
+ while (true)
+ {
+ xfer += iprot->readFieldBegin(fname, ftype, fid);
+ if (ftype == ::apache::thrift::protocol::T_STOP) {
+ break;
+ }
+ switch (fid)
+ {
+ default:
+ xfer += iprot->skip(ftype);
+ break;
+ }
+ xfer += iprot->readFieldEnd();
+ }
+
+ xfer += iprot->readStructEnd();
+
+ return xfer;
+}
+
void ThriftHiveClient::execute(const std::string& query)
{
send_execute(query);
@@ -1884,6 +2007,60 @@
throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "getQueryPlan failed: unknown result");
}
+void ThriftHiveClient::clean()
+{
+ send_clean();
+ recv_clean();
+}
+
+void ThriftHiveClient::send_clean()
+{
+ int32_t cseqid = 0;
+ oprot_->writeMessageBegin("clean", ::apache::thrift::protocol::T_CALL, cseqid);
+
+ ThriftHive_clean_pargs args;
+ args.write(oprot_);
+
+ oprot_->writeMessageEnd();
+ oprot_->getTransport()->flush();
+ oprot_->getTransport()->writeEnd();
+}
+
+void ThriftHiveClient::recv_clean()
+{
+
+ int32_t rseqid = 0;
+ std::string fname;
+ ::apache::thrift::protocol::TMessageType mtype;
+
+ iprot_->readMessageBegin(fname, mtype, rseqid);
+ if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {
+ ::apache::thrift::TApplicationException x;
+ x.read(iprot_);
+ iprot_->readMessageEnd();
+ iprot_->getTransport()->readEnd();
+ throw x;
+ }
+ if (mtype != ::apache::thrift::protocol::T_REPLY) {
+ iprot_->skip(::apache::thrift::protocol::T_STRUCT);
+ iprot_->readMessageEnd();
+ iprot_->getTransport()->readEnd();
+ throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::INVALID_MESSAGE_TYPE);
+ }
+ if (fname.compare("clean") != 0) {
+ iprot_->skip(::apache::thrift::protocol::T_STRUCT);
+ iprot_->readMessageEnd();
+ iprot_->getTransport()->readEnd();
+ throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::WRONG_METHOD_NAME);
+ }
+ ThriftHive_clean_presult result;
+ result.read(iprot_);
+ iprot_->readMessageEnd();
+ iprot_->getTransport()->readEnd();
+
+ return;
+}
+
bool ThriftHiveProcessor::process(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot) {
::apache::thrift::protocol::TProtocol* iprot = piprot.get();
@@ -2167,5 +2344,32 @@
oprot->getTransport()->writeEnd();
}
+void ThriftHiveProcessor::process_clean(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot)
+{
+ ThriftHive_clean_args args;
+ args.read(iprot);
+ iprot->readMessageEnd();
+ iprot->getTransport()->readEnd();
+
+ ThriftHive_clean_result result;
+ try {
+ iface_->clean();
+ } catch (const std::exception& e) {
+ ::apache::thrift::TApplicationException x(e.what());
+ oprot->writeMessageBegin("clean", ::apache::thrift::protocol::T_EXCEPTION, seqid);
+ x.write(oprot);
+ oprot->writeMessageEnd();
+ oprot->getTransport()->flush();
+ oprot->getTransport()->writeEnd();
+ return;
+ }
+
+ oprot->writeMessageBegin("clean", ::apache::thrift::protocol::T_REPLY, seqid);
+ result.write(oprot);
+ oprot->writeMessageEnd();
+ oprot->getTransport()->flush();
+ oprot->getTransport()->writeEnd();
+}
+
}}} // namespace
Index: service/src/gen/thrift/gen-rb/thrift_hive.rb
===================================================================
--- service/src/gen/thrift/gen-rb/thrift_hive.rb (revision 1069164)
+++ service/src/gen/thrift/gen-rb/thrift_hive.rb (working copy)
@@ -139,6 +139,20 @@
raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'getQueryPlan failed: unknown result')
end
+ def clean()
+ send_clean()
+ recv_clean()
+ end
+
+ def send_clean()
+ send_message('clean', Clean_args)
+ end
+
+ def recv_clean()
+ result = receive_message(Clean_result)
+ return
+ end
+
end
class Processor < ThriftHiveMetastore::Processor
@@ -232,6 +246,13 @@
write_result(result, oprot, 'getQueryPlan', seqid)
end
+ def process_clean(seqid, iprot, oprot)
+ args = read_args(iprot, Clean_args)
+ result = Clean_result.new()
+ @handler.clean()
+ write_result(result, oprot, 'clean', seqid)
+ end
+
end
# HELPER FUNCTIONS AND STRUCTURES
@@ -500,5 +521,35 @@
::Thrift::Struct.generate_accessors self
end
+ class Clean_args
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+
+ FIELDS = {
+
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ end
+
+ ::Thrift::Struct.generate_accessors self
+ end
+
+ class Clean_result
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+
+ FIELDS = {
+
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ end
+
+ ::Thrift::Struct.generate_accessors self
+ end
+
end
Index: service/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/service/ThriftHive.java
===================================================================
--- service/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/service/ThriftHive.java (revision 1069164)
+++ service/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/service/ThriftHive.java (working copy)
@@ -46,6 +46,8 @@
public org.apache.hadoop.hive.ql.plan.api.QueryPlan getQueryPlan() throws HiveServerException, TException;
+ public void clean() throws TException;
+
}
public interface AsyncIface extends org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore .AsyncIface {
@@ -66,6 +68,8 @@
public void getQueryPlan(AsyncMethodCallback resultHandler) throws TException;
+ public void clean(AsyncMethodCallback resultHandler) throws TException;
+
}
public static class Client extends org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Client implements TServiceClient, Iface {
@@ -392,6 +396,38 @@
throw new TApplicationException(TApplicationException.MISSING_RESULT, "getQueryPlan failed: unknown result");
}
+ public void clean() throws TException
+ {
+ send_clean();
+ recv_clean();
+ }
+
+ public void send_clean() throws TException
+ {
+ oprot_.writeMessageBegin(new TMessage("clean", TMessageType.CALL, ++seqid_));
+ clean_args args = new clean_args();
+ args.write(oprot_);
+ oprot_.writeMessageEnd();
+ oprot_.getTransport().flush();
+ }
+
+ public void recv_clean() throws TException
+ {
+ TMessage msg = iprot_.readMessageBegin();
+ if (msg.type == TMessageType.EXCEPTION) {
+ TApplicationException x = TApplicationException.read(iprot_);
+ iprot_.readMessageEnd();
+ throw x;
+ }
+ if (msg.seqid != seqid_) {
+ throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, "clean failed: out of sequence response");
+ }
+ clean_result result = new clean_result();
+ result.read(iprot_);
+ iprot_.readMessageEnd();
+ return;
+ }
+
}
public static class AsyncClient extends org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.AsyncClient implements AsyncIface {
public static class Factory implements TAsyncClientFactory {
@@ -640,6 +676,34 @@
}
}
+ public void clean(AsyncMethodCallback resultHandler) throws TException {
+ checkReady();
+ clean_call method_call = new clean_call(resultHandler, this, protocolFactory, transport);
+ manager.call(method_call);
+ }
+
+ public static class clean_call extends TAsyncMethodCall {
+ public clean_call(AsyncMethodCallback resultHandler, TAsyncClient client, TProtocolFactory protocolFactory, TNonblockingTransport transport) throws TException {
+ super(client, protocolFactory, transport, resultHandler, false);
+ }
+
+ public void write_args(TProtocol prot) throws TException {
+ prot.writeMessageBegin(new TMessage("clean", TMessageType.CALL, 0));
+ clean_args args = new clean_args();
+ args.write(prot);
+ prot.writeMessageEnd();
+ }
+
+ public void getResult() throws TException {
+ if (getState() != State.RESPONSE_READ) {
+ throw new IllegalStateException("Method call not finished!");
+ }
+ TMemoryInputTransport memoryTransport = new TMemoryInputTransport(getFrameBuffer().array());
+ TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+ (new Client(prot)).recv_clean();
+ }
+ }
+
}
public static class Processor extends org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Processor implements TProcessor {
@@ -656,6 +720,7 @@
processMap_.put("getThriftSchema", new getThriftSchema());
processMap_.put("getClusterStatus", new getClusterStatus());
processMap_.put("getQueryPlan", new getQueryPlan());
+ processMap_.put("clean", new clean());
}
private Iface iface_;
@@ -982,6 +1047,32 @@
}
+ private class clean implements ProcessFunction {
+ public void process(int seqid, TProtocol iprot, TProtocol oprot) throws TException
+ {
+ clean_args args = new clean_args();
+ try {
+ args.read(iprot);
+ } catch (TProtocolException e) {
+ iprot.readMessageEnd();
+ TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
+ oprot.writeMessageBegin(new TMessage("clean", TMessageType.EXCEPTION, seqid));
+ x.write(oprot);
+ oprot.writeMessageEnd();
+ oprot.getTransport().flush();
+ return;
+ }
+ iprot.readMessageEnd();
+ clean_result result = new clean_result();
+ iface_.clean();
+ oprot.writeMessageBegin(new TMessage("clean", TMessageType.REPLY, seqid));
+ result.write(oprot);
+ oprot.writeMessageEnd();
+ oprot.getTransport().flush();
+ }
+
+ }
+
}
public static class execute_args implements TBase, java.io.Serializable, Cloneable {
@@ -5553,4 +5644,371 @@
}
+ public static class clean_args implements TBase, java.io.Serializable, Cloneable {
+ private static final TStruct STRUCT_DESC = new TStruct("clean_args");
+
+
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements TFieldIdEnum {
+;
+
+ private static final Map byName = new HashMap();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+ public static final Map<_Fields, FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, FieldMetaData> tmpMap = new EnumMap<_Fields, FieldMetaData>(_Fields.class);
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ FieldMetaData.addStructMetaDataMap(clean_args.class, metaDataMap);
+ }
+
+ public clean_args() {
+ }
+
+ /**
+ * Performs a deep copy on other.
+ */
+ public clean_args(clean_args other) {
+ }
+
+ public clean_args deepCopy() {
+ return new clean_args(this);
+ }
+
+ @Override
+ public void clear() {
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been asigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof clean_args)
+ return this.equals((clean_args)that);
+ return false;
+ }
+
+ public boolean equals(clean_args that) {
+ if (that == null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public int compareTo(clean_args other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ clean_args typedOther = (clean_args)other;
+
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(TProtocol iprot) throws TException {
+ TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == TType.STOP) {
+ break;
+ }
+ switch (field.id) {
+ default:
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ validate();
+ }
+
+ public void write(TProtocol oprot) throws TException {
+ validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("clean_args(");
+ boolean first = true;
+
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws TException {
+ // check for required fields
+ }
+
+ }
+
+ public static class clean_result implements TBase, java.io.Serializable, Cloneable {
+ private static final TStruct STRUCT_DESC = new TStruct("clean_result");
+
+
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements TFieldIdEnum {
+;
+
+ private static final Map byName = new HashMap();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+ public static final Map<_Fields, FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, FieldMetaData> tmpMap = new EnumMap<_Fields, FieldMetaData>(_Fields.class);
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ FieldMetaData.addStructMetaDataMap(clean_result.class, metaDataMap);
+ }
+
+ public clean_result() {
+ }
+
+ /**
+ * Performs a deep copy on other.
+ */
+ public clean_result(clean_result other) {
+ }
+
+ public clean_result deepCopy() {
+ return new clean_result(this);
+ }
+
+ @Override
+ public void clear() {
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been asigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof clean_result)
+ return this.equals((clean_result)that);
+ return false;
+ }
+
+ public boolean equals(clean_result that) {
+ if (that == null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public int compareTo(clean_result other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ clean_result typedOther = (clean_result)other;
+
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(TProtocol iprot) throws TException {
+ TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == TType.STOP) {
+ break;
+ }
+ switch (field.id) {
+ default:
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ validate();
+ }
+
+ public void write(TProtocol oprot) throws TException {
+ oprot.writeStructBegin(STRUCT_DESC);
+
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("clean_result(");
+ boolean first = true;
+
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws TException {
+ // check for required fields
+ }
+
+ }
+
}
Index: service/src/gen/thrift/gen-php/hive_service/ThriftHive.php
===================================================================
--- service/src/gen/thrift/gen-php/hive_service/ThriftHive.php (revision 1069164)
+++ service/src/gen/thrift/gen-php/hive_service/ThriftHive.php (working copy)
@@ -18,6 +18,7 @@
public function getThriftSchema();
public function getClusterStatus();
public function getQueryPlan();
+ public function clean();
}
class ThriftHiveClient extends ThriftHiveMetastoreClient implements ThriftHiveIf {
@@ -448,6 +449,53 @@
throw new Exception("getQueryPlan failed: unknown result");
}
+ public function clean()
+ {
+ $this->send_clean();
+ $this->recv_clean();
+ }
+
+ public function send_clean()
+ {
+ $args = new ThriftHive_clean_args();
+ $bin_accel = ($this->output_ instanceof TProtocol::$TBINARYPROTOCOLACCELERATED) && function_exists('thrift_protocol_write_binary');
+ if ($bin_accel)
+ {
+ thrift_protocol_write_binary($this->output_, 'clean', TMessageType::CALL, $args, $this->seqid_, $this->output_->isStrictWrite());
+ }
+ else
+ {
+ $this->output_->writeMessageBegin('clean', TMessageType::CALL, $this->seqid_);
+ $args->write($this->output_);
+ $this->output_->writeMessageEnd();
+ $this->output_->getTransport()->flush();
+ }
+ }
+
+ public function recv_clean()
+ {
+ $bin_accel = ($this->input_ instanceof TProtocol::$TBINARYPROTOCOLACCELERATED) && function_exists('thrift_protocol_read_binary');
+ if ($bin_accel) $result = thrift_protocol_read_binary($this->input_, 'ThriftHive_clean_result', $this->input_->isStrictRead());
+ else
+ {
+ $rseqid = 0;
+ $fname = null;
+ $mtype = 0;
+
+ $this->input_->readMessageBegin($fname, $mtype, $rseqid);
+ if ($mtype == TMessageType::EXCEPTION) {
+ $x = new TApplicationException();
+ $x->read($this->input_);
+ $this->input_->readMessageEnd();
+ throw $x;
+ }
+ $result = new ThriftHive_clean_result();
+ $result->read($this->input_);
+ $this->input_->readMessageEnd();
+ }
+ return;
+ }
+
}
// HELPER FUNCTIONS AND STRUCTURES
@@ -1700,4 +1748,104 @@
}
+class ThriftHive_clean_args {
+ static $_TSPEC;
+
+
+ public function __construct() {
+ if (!isset(self::$_TSPEC)) {
+ self::$_TSPEC = array(
+ );
+ }
+ }
+
+ public function getName() {
+ return 'ThriftHive_clean_args';
+ }
+
+ public function read($input)
+ {
+ $xfer = 0;
+ $fname = null;
+ $ftype = 0;
+ $fid = 0;
+ $xfer += $input->readStructBegin($fname);
+ while (true)
+ {
+ $xfer += $input->readFieldBegin($fname, $ftype, $fid);
+ if ($ftype == TType::STOP) {
+ break;
+ }
+ switch ($fid)
+ {
+ default:
+ $xfer += $input->skip($ftype);
+ break;
+ }
+ $xfer += $input->readFieldEnd();
+ }
+ $xfer += $input->readStructEnd();
+ return $xfer;
+ }
+
+ public function write($output) {
+ $xfer = 0;
+ $xfer += $output->writeStructBegin('ThriftHive_clean_args');
+ $xfer += $output->writeFieldStop();
+ $xfer += $output->writeStructEnd();
+ return $xfer;
+ }
+
+}
+
+class ThriftHive_clean_result {
+ static $_TSPEC;
+
+
+ public function __construct() {
+ if (!isset(self::$_TSPEC)) {
+ self::$_TSPEC = array(
+ );
+ }
+ }
+
+ public function getName() {
+ return 'ThriftHive_clean_result';
+ }
+
+ public function read($input)
+ {
+ $xfer = 0;
+ $fname = null;
+ $ftype = 0;
+ $fid = 0;
+ $xfer += $input->readStructBegin($fname);
+ while (true)
+ {
+ $xfer += $input->readFieldBegin($fname, $ftype, $fid);
+ if ($ftype == TType::STOP) {
+ break;
+ }
+ switch ($fid)
+ {
+ default:
+ $xfer += $input->skip($ftype);
+ break;
+ }
+ $xfer += $input->readFieldEnd();
+ }
+ $xfer += $input->readStructEnd();
+ return $xfer;
+ }
+
+ public function write($output) {
+ $xfer = 0;
+ $xfer += $output->writeStructBegin('ThriftHive_clean_result');
+ $xfer += $output->writeFieldStop();
+ $xfer += $output->writeStructEnd();
+ return $xfer;
+ }
+
+}
+
?>
Index: service/if/hive_service.thrift
===================================================================
--- service/if/hive_service.thrift (revision 1069164)
+++ service/if/hive_service.thrift (working copy)
@@ -81,4 +81,6 @@
# Get the queryplan annotated with counter information
queryplan.QueryPlan getQueryPlan() throws(1:HiveServerException ex)
+ # clean up last Hive query (releasing locks etc.)
+ void clean()
}
Index: cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java
===================================================================
--- cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java (revision 1069164)
+++ cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java (working copy)
@@ -27,15 +27,14 @@
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import jline.Completor;
import jline.ArgumentCompletor;
+import jline.ArgumentCompletor.AbstractArgumentDelimiter;
import jline.ArgumentCompletor.ArgumentDelimiter;
-import jline.ArgumentCompletor.AbstractArgumentDelimiter;
+import jline.Completor;
import jline.ConsoleReader;
import jline.History;
import jline.SimpleCompletor;
@@ -45,18 +44,21 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.Driver;
-import org.apache.hadoop.hive.ql.parse.ParseDriver;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter;
+import org.apache.hadoop.hive.ql.parse.ParseDriver;
import org.apache.hadoop.hive.ql.processors.CommandProcessor;
import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.service.HiveClient;
+import org.apache.hadoop.hive.service.HiveServerException;
import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.thrift.TException;
/**
* CliDriver.
@@ -64,8 +66,9 @@
*/
public class CliDriver {
- public static final String prompt = "hive";
- public static final String prompt2 = " "; // when ';' is not yet seen
+ public static String prompt = "hive";
+ public static String prompt2 = " "; // when ';' is not yet seen
+ public static final int LINES_TO_FETCH = 40; // number of lines to fetch in batch from remote hive server
public static final String HIVERCFILE = ".hiverc";
@@ -80,7 +83,7 @@
}
public int processCmd(String cmd) {
- SessionState ss = SessionState.get();
+ CliSessionState ss = (CliSessionState) SessionState.get();
String cmd_trimmed = cmd.trim();
String[] tokens = cmd_trimmed.split("\\s+");
@@ -149,8 +152,50 @@
ss.out.println(StringUtils.join(s, "\n"));
}
}
+ } else if (ss.isRemoteMode()) { // remote mode -- connecting to remote hive server
+ HiveClient client = ss.getClient();
+ PrintStream out = ss.out;
+ PrintStream err = ss.err;
- } else {
+ try {
+ client.execute(cmd_trimmed);
+ List results;
+ do {
+ results = client.fetchN(LINES_TO_FETCH);
+ for (String line: results) {
+ out.println(line);
+ }
+ } while (results.size() == LINES_TO_FETCH);
+ } catch (HiveServerException e) {
+ ret = e.getErrorCode();
+ if (ret != 0) { // OK if ret == 0 -- reached the EOF
+ String errMsg = e.getMessage();
+ if (errMsg == null) {
+ errMsg = e.toString();
+ }
+ ret = e.getErrorCode();
+ err.println("[Hive Error]: " + errMsg);
+ }
+ } catch (TException e) {
+ String errMsg = e.getMessage();
+ if (errMsg == null) {
+ errMsg = e.toString();
+ }
+ ret = -10002;
+ err.println("[Thrift Error]: " + errMsg);
+ } finally {
+ try {
+ client.clean();
+ } catch (TException e) {
+ String errMsg = e.getMessage();
+ if (errMsg == null) {
+ errMsg = e.toString();
+ }
+ err.println("[Thrift Error]: Hive server is not cleaned due to thrift exception: "
+ + errMsg);
+ }
+ }
+ } else { // local mode
CommandProcessor proc = CommandProcessorFactory.get(tokens[0], (HiveConf)conf);
if (proc != null) {
if (proc instanceof Driver) {
@@ -168,7 +213,7 @@
}
ArrayList res = new ArrayList();
-
+
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_CLI_PRINT_HEADER)) {
// Print the column names
boolean first_col = true;
@@ -332,6 +377,7 @@
// as a keyword delimiter, we need to define a new ArgumentDelimiter
// that recognizes parenthesis as a delimiter.
ArgumentDelimiter delim = new AbstractArgumentDelimiter () {
+ @Override
public boolean isDelimiterChar (String buffer, int pos) {
char c = buffer.charAt(pos);
return (Character.isWhitespace(c) || c == '(' || c == ')' ||
@@ -367,7 +413,7 @@
return ret;
}
};
-
+
return completor;
}
@@ -416,6 +462,17 @@
SessionState.start(ss);
+ // connect to Hive Server
+ if (ss.getHost() != null) {
+ ss.connect();
+ if (ss.isRemoteMode()) {
+ prompt = "[" + ss.host + ':' + ss.port + "] " + prompt;
+ char[] spaces = new char[prompt.length()];
+ Arrays.fill(spaces, ' ');
+ prompt2 = new String(spaces);
+ }
+ }
+
CliDriver cli = new CliDriver();
// Execute -i init files (always in silent mode)
@@ -463,6 +520,8 @@
}
}
+ ss.close();
+
System.exit(ret);
}
Index: cli/src/java/org/apache/hadoop/hive/cli/OptionsProcessor.java
===================================================================
--- cli/src/java/org/apache/hadoop/hive/cli/OptionsProcessor.java (revision 1069164)
+++ cli/src/java/org/apache/hadoop/hive/cli/OptionsProcessor.java (working copy)
@@ -39,7 +39,7 @@
/**
* OptionsProcessor.
- *
+ *
*/
public class OptionsProcessor {
@@ -47,7 +47,7 @@
private final Parser parser = new Parser();
private final Option confOptions, initFilesOption, isSilentOption,
- execOption, fileOption, isHelpOption, isVerboseOption;
+ execOption, fileOption, isHelpOption, isVerboseOption, hostOption, portOption;
/**
* Shamelessly cloned from Hadoop streaming take in multiple -hiveconf x=y parameters.
@@ -154,7 +154,7 @@
isVerboseOption = createBoolOption(builder, "verbose", "v", "verbose mode");
// -help
- isHelpOption = createBoolOption(builder, "help", "h", "help");
+ isHelpOption = createBoolOption(builder, "help", "H", "help");
// -hiveconf var=val
confOptions = new MultiPropertyOption("-hiveconf",
@@ -163,12 +163,29 @@
initFilesOption = new MultiPropertyOption(
"-i", "File to run before other commands", 'I', false);
+ // -h
+ hostOption = createOptionWithArg(builder, "host", "h",
+ "connecting to Hive Server on host",
+ argBuilder.withMinimum(1).withMaximum(1).create());
+
+ // -p
+ portOption = createOptionWithArg(builder, "port", "p",
+ "connecting to Hive Server on port",
+ argBuilder.withMinimum(1).withMaximum(1).create());
+
new PropertyOption();
Group allOptions =
- new GroupBuilder().withOption(confOptions).withOption(initFilesOption)
- .withOption(isSilentOption).withOption(isHelpOption)
- .withOption(execOption).withOption(fileOption)
- .withOption(isVerboseOption).create();
+ new GroupBuilder()
+ .withOption(confOptions)
+ .withOption(initFilesOption)
+ .withOption(isSilentOption)
+ .withOption(isHelpOption)
+ .withOption(hostOption)
+ .withOption(portOption)
+ .withOption(execOption)
+ .withOption(fileOption)
+ .withOption(isVerboseOption)
+ .create();
parser.setGroup(allOptions);
}
@@ -207,11 +224,16 @@
if (null != initFiles) {
ss.initFiles = initFiles;
}
- // -h
+ // -H
if (cmdLine.hasOption(isHelpOption)) {
printUsage(null);
return false;
}
+
+ // -h, -p
+ ss.host = (String) cmdLine.getValue(hostOption);
+ ss.port = Integer.parseInt((String) cmdLine.getValue(portOption, "10000"));
+
if (ss.execString != null && ss.fileName != null) {
printUsage("-e and -f option cannot be specified simultaneously");
return false;
@@ -241,10 +263,15 @@
System.err.println(" -e 'quoted query string' Sql from command line");
System.err.println(" -f Sql from files");
System.err.println(" -S Silent mode in interactive shell");
+ System.err.println(" -h hostname running the Hive server (remote mode)");
+ System.err.println(" -p port number (default 10000) to which the Hive server is listening (remote mode only)");
System.err.println(" -v Verbose mode (echo executed Sql to the console)");
+ System.err.println(" -H this help message");
System.err.println("");
System.err.println("-e and -f cannot be specified together. In the absence of these");
System.err.println(" options, interactive shell is started.");
+ System.err.println(" By default Hive CLI is running in local mode, if -h is specified it will be");
+ System.err.println(" running in remote mode, which try to connect to the Hive server on remote host.");
System.err.println("");
}
Index: cli/src/java/org/apache/hadoop/hive/cli/CliSessionState.java
===================================================================
--- cli/src/java/org/apache/hadoop/hive/cli/CliSessionState.java (revision 1069164)
+++ cli/src/java/org/apache/hadoop/hive/cli/CliSessionState.java (working copy)
@@ -24,10 +24,17 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.service.HiveClient;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
/**
* CliSessionState.
- *
+ *
*/
public class CliSessionState extends SessionState {
/**
@@ -50,12 +57,65 @@
*/
public List initFiles = new ArrayList();
+ /**
+ * host name and port number of remote Hive server
+ */
+ protected String host;
+ protected int port;
+
+ private boolean remoteMode;
+
+ private TTransport transport;
+ private HiveClient client;
+
public CliSessionState() {
super();
+ remoteMode = false;
}
public CliSessionState(HiveConf conf) {
super(conf);
+ remoteMode = false;
}
+ /**
+ * Connect to Hive Server
+ */
+ public void connect() throws TTransportException {
+ transport = new TSocket(host, port);
+ TProtocol protocol = new TBinaryProtocol(transport);
+ client = new HiveClient(protocol);
+ transport.open();
+ remoteMode = true;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void close() {
+ try {
+ client.clean();
+ client.shutdown();
+ transport.close();
+ } catch (TException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public boolean isRemoteMode() {
+ return remoteMode;
+ }
+
+ public HiveClient getClient() {
+ return client;
+ }
}
Index: ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (revision 1069164)
+++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (working copy)
@@ -77,6 +77,7 @@
* HiveHistory Object
*/
protected HiveHistory hiveHist;
+
/**
* Streams to read/write from.
*/
@@ -85,16 +86,22 @@
public PrintStream err;
/**
+ * Temporary file name used to store results of non-Hive commands (e.g., set, dfs)
+ * and HiveServer.fetch*() function will read results from this file
+ */
+ protected File tmpOutputFile;
+
+ /**
* type of the command.
*/
private HiveOperation commandType;
-
+
private HiveAuthorizationProvider authorizer;
-
+
private HiveAuthenticationProvider authenticator;
-
+
private CreateTableAutomaticGrant createTableGrants;
-
+
/**
* Lineage state.
*/
@@ -117,6 +124,14 @@
this.conf = conf;
}
+ public File getTmpOutputFile() {
+ return tmpOutputFile;
+ }
+
+ public void setTmpOutputFile(File f) {
+ tmpOutputFile = f;
+ }
+
public boolean getIsSilent() {
if(conf != null) {
return conf.getBoolVar(HiveConf.ConfVars.HIVESESSIONSILENT);
@@ -174,29 +189,22 @@
/**
* start a new session and set it to current session.
- * @throws HiveException
*/
- public static SessionState start(HiveConf conf) throws HiveException {
+ public static SessionState start(HiveConf conf) {
SessionState ss = new SessionState(conf);
- ss.getConf().setVar(HiveConf.ConfVars.HIVESESSIONID, makeSessionId());
- ss.hiveHist = new HiveHistory(ss);
- ss.authenticator = HiveUtils.getAuthenticator(conf);
- ss.authorizer = HiveUtils.getAuthorizeProviderManager(
- conf, ss.authenticator);
- ss.createTableGrants = CreateTableAutomaticGrant.create(conf);
- tss.set(ss);
- return (ss);
+ return start(ss);
}
/**
* set current session to existing session object if a thread is running
* multiple sessions - it must call this method with the new session object
* when switching from one session to another.
- * @throws HiveException
+ * @throws HiveException
*/
public static SessionState start(SessionState startSs) {
tss.set(startSs);
+
if (StringUtils.isEmpty(startSs.getConf().getVar(
HiveConf.ConfVars.HIVESESSIONID))) {
startSs.getConf()
@@ -206,7 +214,21 @@
if (startSs.hiveHist == null) {
startSs.hiveHist = new HiveHistory(startSs);
}
-
+
+ if (startSs.getTmpOutputFile() == null) {
+ // per-session temp file containing results to be sent from HiveServer to HiveClient
+ File tmpDir = new File(
+ HiveConf.getVar(startSs.getConf(), HiveConf.ConfVars.HIVEHISTORYFILELOC));
+ String sessionID = startSs.getConf().getVar(HiveConf.ConfVars.HIVESESSIONID);
+ try {
+ File tmpFile = File.createTempFile(sessionID, ".pipeout", tmpDir);
+ tmpFile.deleteOnExit();
+ startSs.setTmpOutputFile(tmpFile);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
try {
startSs.authenticator = HiveUtils.getAuthenticator(startSs
.getConf());
@@ -217,7 +239,7 @@
} catch (HiveException e) {
throw new RuntimeException(e);
}
-
+
return startSs;
}
@@ -586,7 +608,7 @@
}
return commandType.getOperationName();
}
-
+
public HiveOperation getHiveOperation() {
return commandType;
}
@@ -594,7 +616,7 @@
public void setCommandType(HiveOperation commandType) {
this.commandType = commandType;
}
-
+
public HiveAuthorizationProvider getAuthorizer() {
return authorizer;
}
@@ -610,7 +632,7 @@
public void setAuthenticator(HiveAuthenticationProvider authenticator) {
this.authenticator = authenticator;
}
-
+
public CreateTableAutomaticGrant getCreateTableGrants() {
return createTableGrants;
}