Index: build.xml =================================================================== --- build.xml (revision 1068272) +++ build.xml (working copy) @@ -111,7 +111,7 @@ - + @@ -122,7 +122,7 @@ - + @@ -133,7 +133,7 @@ - + Index: service/src/java/org/apache/hadoop/hive/service/HiveServer.java =================================================================== --- service/src/java/org/apache/hadoop/hive/service/HiveServer.java (revision 1068272) +++ service/src/java/org/apache/hadoop/hive/service/HiveServer.java (working copy) @@ -18,11 +18,17 @@ package org.apache.hadoop.hive.service; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FileReader; import java.io.IOException; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.List; -import com.facebook.fb303.fb_status; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -48,6 +54,8 @@ import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportFactory; +import com.facebook.fb303.fb_status; + /** * Thrift Hive Server Implementation. */ @@ -63,8 +71,14 @@ /** * Hive server uses org.apache.hadoop.hive.ql.Driver for run() and * getResults() methods. + * It is the instance of the last Hive query. */ - private final Driver driver; + private Driver driver; + /** + * For processors other than Hive queries (Driver), they output to session.out (a temp file) + * first and the fetchOne/fetchN/fetchAll functions get the output from pipeIn. + */ + private BufferedReader pipeIn; /** * Flag that indicates whether the last executed command was a Hive query. @@ -80,14 +94,35 @@ super(HiveServer.class.getName()); isHiveQuery = false; + driver = null; SessionState session = new SessionState(new HiveConf(SessionState.class)); SessionState.start(session); - session.in = null; - session.out = null; - session.err = null; - driver = new Driver(); + setupSessionIO(session); } + private void setupSessionIO(SessionState session) { + try { + LOG.info("Putting temp output to file " + session.getTmpOutputFile().toString()); + session.in = null; // hive server's session input stream is not used + // open a per-session file in auto-flush mode for writing temp results + session.out = new PrintStream(new FileOutputStream(session.getTmpOutputFile()), true, "UTF-8"); + // TODO: for hadoop jobs, progress is printed out to session.err, + // we should find a way to feed back job progress to client + session.err = new PrintStream(System.err, true, "UTF-8"); + } catch (IOException e) { + LOG.error("Error in creating temp output file ", e); + try { + session.in = null; + session.out = new PrintStream(System.out, true, "UTF-8"); + session.err = new PrintStream(System.err, true, "UTF-8"); + } catch (UnsupportedEncodingException ee) { + ee.printStackTrace(); + session.out = null; + session.err = null; + } + } + } + /** * Executes a query. * @@ -96,7 +131,7 @@ */ public void execute(String cmd) throws HiveServerException, TException { HiveServerHandler.LOG.info("Running the query: " + cmd); - SessionState.get(); + SessionState session = SessionState.get(); String cmd_trimmed = cmd.trim(); String[] tokens = cmd_trimmed.split("\\s"); @@ -111,11 +146,14 @@ CommandProcessorResponse response = null; if (proc != null) { if (proc instanceof Driver) { - ((Driver)proc).destroy(); isHiveQuery = true; + driver = (Driver) proc; response = driver.run(cmd); } else { isHiveQuery = false; + driver = null; + // need to reset output for each non-Hive query + setupSessionIO(session); response = proc.run(cmd_1); } @@ -126,6 +164,7 @@ } catch (Exception e) { HiveServerException ex = new HiveServerException(); ex.setMessage("Error running query: " + e.toString()); + ex.setErrorCode(ret == 0? -10000: ret); throw ex; } @@ -136,13 +175,32 @@ } /** + * Should be called by the client at the end of a session. + */ + public void clean() { + if (driver != null) { + driver.close(); + driver.destroy(); + } + + SessionState session = SessionState.get(); + if (session.getTmpOutputFile() != null) { + session.getTmpOutputFile().delete(); + } + pipeIn = null; + } + + /** * Return the status information about the Map-Reduce cluster. */ public HiveClusterStatus getClusterStatus() throws HiveServerException, TException { HiveClusterStatus hcs; try { - ClusterStatus cs = driver.getClusterStatus(); + Driver drv = new Driver(); + drv.init(); + + ClusterStatus cs = drv.getClusterStatus(); JobTracker.State jbs = cs.getJobTrackerState(); // Convert the ClusterStatus to its Thrift equivalent: HiveClusterStatus @@ -181,6 +239,8 @@ return new Schema(); } + assert driver != null: "getSchema() is called on a Hive query and driver is NULL."; + try { Schema schema = driver.getSchema(); if (schema == null) { @@ -206,6 +266,8 @@ return new Schema(); } + assert driver != null: "getThriftSchema() is called on a Hive query and driver is NULL."; + try { Schema schema = driver.getThriftSchema(); if (schema == null) { @@ -222,6 +284,7 @@ } } + /** * Fetches the next row in a query result set. * @@ -231,9 +294,17 @@ public String fetchOne() throws HiveServerException, TException { if (!isHiveQuery) { // Return no results if the last command was not a Hive query - return ""; + List results = new ArrayList(1); + readResults(results, 1); + if (results.size() > 0) { + return results.get(0); + } else { // throw an EOF exception + throw new HiveServerException("OK", 0, ""); + } } + assert driver != null: "fetchOne() is called on a Hive query and driver is NULL."; + ArrayList result = new ArrayList(); driver.setMaxRows(1); try { @@ -243,7 +314,8 @@ // TODO: Cannot return null here because thrift cannot handle nulls // TODO: Returning empty string for now. Need to figure out how to // TODO: return null in some other way - return ""; + throw new HiveServerException("OK", 0, ""); + // return ""; } catch (IOException e) { HiveServerException ex = new HiveServerException(); ex.setMessage(e.getMessage()); @@ -251,7 +323,58 @@ } } + private void cleanTmpFile() { + if (pipeIn != null) { + long len = 0L, pos = 0L; + SessionState session = SessionState.get(); + File tmp = session.getTmpOutputFile(); + tmp.delete(); + pipeIn = null; + } + } + /** + * Reads the temporary results for non-Hive (non-Driver) commands to the + * resulting List of strings. + * @param results list of strings containing the results + * @param nLines number of lines read at once. If it is <= 0, then read all lines. + */ + private void readResults(List results, int nLines) { + + if (pipeIn == null) { + SessionState session = SessionState.get(); + File tmp = session.getTmpOutputFile(); + try { + pipeIn = new BufferedReader(new FileReader(tmp)); + } catch (FileNotFoundException e) { + LOG.error("File " + tmp + " not found. ", e); + return; + } + } + + boolean readAll = false; + + for (int i = 0; i < nLines || nLines <= 0; ++i) { + try { + String line = pipeIn.readLine(); + if (line == null) { + // reached the end of the result file + readAll = true; + break; + } else { + results.add(line); + } + } catch (IOException e) { + LOG.error("Reading temp results encountered an exception: ", e); + readAll = true; + } + } + if (readAll) { + cleanTmpFile(); + } + } + + /** * Fetches numRows rows. * * @param numRows @@ -270,12 +393,16 @@ ex.setMessage("Invalid argument for number of rows: " + numRows); throw ex; } + + ArrayList result = new ArrayList(); + if (!isHiveQuery) { - // Return no results if the last command was not a Hive query - return new ArrayList(); + readResults(result, numRows); + return result; } - ArrayList result = new ArrayList(); + assert driver != null: "fetchN() is called on a Hive query and driver is NULL."; + driver.setMaxRows(numRows); try { driver.getResults(result); @@ -298,13 +425,16 @@ * in the client. */ public List fetchAll() throws HiveServerException, TException { + + ArrayList rows = new ArrayList(); + ArrayList result = new ArrayList(); + if (!isHiveQuery) { - // Return no results if the last command was not a Hive query - return new ArrayList(); + // Return all results if numRows <= 0 + readResults(result, 0); + return result; } - ArrayList rows = new ArrayList(); - ArrayList result = new ArrayList(); try { while (driver.getResults(result)) { rows.addAll(result); @@ -337,6 +467,13 @@ @Override public QueryPlan getQueryPlan() throws HiveServerException, TException { QueryPlan qp = new QueryPlan(); + + if (!isHiveQuery) { + return qp; + } + + assert driver != null: "getQueryPlan() is called on a Hive query and driver is NULL."; + // TODO for now only return one query at a time // going forward, all queries associated with a single statement // will be returned in a single QueryPlan Index: service/src/gen/thrift/gen-py/hive_service/ThriftHive.py =================================================================== --- service/src/gen/thrift/gen-py/hive_service/ThriftHive.py (revision 1068272) +++ service/src/gen/thrift/gen-py/hive_service/ThriftHive.py (working copy) @@ -49,7 +49,10 @@ def getQueryPlan(self, ): pass + def clean(self, ): + pass + class Client(hive_metastore.ThriftHiveMetastore.Client, Iface): def __init__(self, iprot, oprot=None): hive_metastore.ThriftHiveMetastore.Client.__init__(self, iprot, oprot) @@ -278,7 +281,30 @@ raise result.ex raise TApplicationException(TApplicationException.MISSING_RESULT, "getQueryPlan failed: unknown result"); + def clean(self, ): + self.send_clean() + self.recv_clean() + def send_clean(self, ): + self._oprot.writeMessageBegin('clean', TMessageType.CALL, self._seqid) + args = clean_args() + args.write(self._oprot) + self._oprot.writeMessageEnd() + self._oprot.trans.flush() + + def recv_clean(self, ): + (fname, mtype, rseqid) = self._iprot.readMessageBegin() + if mtype == TMessageType.EXCEPTION: + x = TApplicationException() + x.read(self._iprot) + self._iprot.readMessageEnd() + raise x + result = clean_result() + result.read(self._iprot) + self._iprot.readMessageEnd() + return + + class Processor(hive_metastore.ThriftHiveMetastore.Processor, Iface, TProcessor): def __init__(self, handler): hive_metastore.ThriftHiveMetastore.Processor.__init__(self, handler) @@ -290,6 +316,7 @@ self._processMap["getThriftSchema"] = Processor.process_getThriftSchema self._processMap["getClusterStatus"] = Processor.process_getClusterStatus self._processMap["getQueryPlan"] = Processor.process_getQueryPlan + self._processMap["clean"] = Processor.process_clean def process(self, iprot, oprot): (name, type, seqid) = iprot.readMessageBegin() @@ -418,7 +445,18 @@ oprot.writeMessageEnd() oprot.trans.flush() + def process_clean(self, seqid, iprot, oprot): + args = clean_args() + args.read(iprot) + iprot.readMessageEnd() + result = clean_result() + self._handler.clean() + oprot.writeMessageBegin("clean", TMessageType.REPLY, seqid) + result.write(oprot) + oprot.writeMessageEnd() + oprot.trans.flush() + # HELPER FUNCTIONS AND STRUCTURES class execute_args: @@ -1361,3 +1399,85 @@ def __ne__(self, other): return not (self == other) + +class clean_args: + + thrift_spec = ( + ) + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('clean_args') + oprot.writeFieldStop() + oprot.writeStructEnd() + def validate(self): + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class clean_result: + + thrift_spec = ( + ) + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('clean_result') + oprot.writeFieldStop() + oprot.writeStructEnd() + def validate(self): + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) Index: service/src/gen/thrift/gen-py/hive_service/ThriftHive-remote =================================================================== --- service/src/gen/thrift/gen-py/hive_service/ThriftHive-remote (revision 1068272) +++ service/src/gen/thrift/gen-py/hive_service/ThriftHive-remote (working copy) @@ -29,6 +29,7 @@ print ' Schema getThriftSchema()' print ' HiveClusterStatus getClusterStatus()' print ' QueryPlan getQueryPlan()' + print ' void clean()' print '' sys.exit(0) @@ -127,6 +128,12 @@ sys.exit(1) pp.pprint(client.getQueryPlan()) +elif cmd == 'clean': + if len(args) != 0: + print 'clean requires 0 args' + sys.exit(1) + pp.pprint(client.clean()) + else: print 'Unrecognized method %s' % cmd sys.exit(1) Index: service/src/gen/thrift/gen-cpp/ThriftHive.h =================================================================== --- service/src/gen/thrift/gen-cpp/ThriftHive.h (revision 1068272) +++ service/src/gen/thrift/gen-cpp/ThriftHive.h (working copy) @@ -23,6 +23,7 @@ virtual void getThriftSchema(Apache::Hadoop::Hive::Schema& _return) = 0; virtual void getClusterStatus(HiveClusterStatus& _return) = 0; virtual void getQueryPlan(Apache::Hadoop::Hive::QueryPlan& _return) = 0; + virtual void clean() = 0; }; class ThriftHiveNull : virtual public ThriftHiveIf , virtual public Apache::Hadoop::Hive::ThriftHiveMetastoreNull { @@ -52,6 +53,9 @@ void getQueryPlan(Apache::Hadoop::Hive::QueryPlan& /* _return */) { return; } + void clean() { + return; + } }; typedef struct _ThriftHive_execute_args__isset { @@ -836,6 +840,80 @@ }; + +class ThriftHive_clean_args { + public: + + ThriftHive_clean_args() { + } + + virtual ~ThriftHive_clean_args() throw() {} + + + bool operator == (const ThriftHive_clean_args & /* rhs */) const + { + return true; + } + bool operator != (const ThriftHive_clean_args &rhs) const { + return !(*this == rhs); + } + + bool operator < (const ThriftHive_clean_args & ) const; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + + +class ThriftHive_clean_pargs { + public: + + + virtual ~ThriftHive_clean_pargs() throw() {} + + + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + + +class ThriftHive_clean_result { + public: + + ThriftHive_clean_result() { + } + + virtual ~ThriftHive_clean_result() throw() {} + + + bool operator == (const ThriftHive_clean_result & /* rhs */) const + { + return true; + } + bool operator != (const ThriftHive_clean_result &rhs) const { + return !(*this == rhs); + } + + bool operator < (const ThriftHive_clean_result & ) const; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + + +class ThriftHive_clean_presult { + public: + + + virtual ~ThriftHive_clean_presult() throw() {} + + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + +}; + class ThriftHiveClient : virtual public ThriftHiveIf, public Apache::Hadoop::Hive::ThriftHiveMetastoreClient { public: ThriftHiveClient(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> prot) : @@ -872,6 +950,9 @@ void getQueryPlan(Apache::Hadoop::Hive::QueryPlan& _return); void send_getQueryPlan(); void recv_getQueryPlan(Apache::Hadoop::Hive::QueryPlan& _return); + void clean(); + void send_clean(); + void recv_clean(); }; class ThriftHiveProcessor : virtual public ::apache::thrift::TProcessor, public Apache::Hadoop::Hive::ThriftHiveMetastoreProcessor { @@ -888,6 +969,7 @@ void process_getThriftSchema(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot); void process_getClusterStatus(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot); void process_getQueryPlan(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot); + void process_clean(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot); public: ThriftHiveProcessor(boost::shared_ptr iface) : Apache::Hadoop::Hive::ThriftHiveMetastoreProcessor(iface), @@ -900,6 +982,7 @@ processMap_["getThriftSchema"] = &ThriftHiveProcessor::process_getThriftSchema; processMap_["getClusterStatus"] = &ThriftHiveProcessor::process_getClusterStatus; processMap_["getQueryPlan"] = &ThriftHiveProcessor::process_getQueryPlan; + processMap_["clean"] = &ThriftHiveProcessor::process_clean; } virtual bool process(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot); @@ -1014,6 +1097,13 @@ } } + void clean() { + uint32_t sz = ifaces_.size(); + for (uint32_t i = 0; i < sz; ++i) { + ifaces_[i]->clean(); + } + } + }; }}} // namespace Index: service/src/gen/thrift/gen-cpp/ThriftHive_server.skeleton.cpp =================================================================== --- service/src/gen/thrift/gen-cpp/ThriftHive_server.skeleton.cpp (revision 1068272) +++ service/src/gen/thrift/gen-cpp/ThriftHive_server.skeleton.cpp (working copy) @@ -62,6 +62,11 @@ printf("getQueryPlan\n"); } + void clean() { + // Your implementation goes here + printf("clean\n"); + } + }; int main(int argc, char **argv) { Index: service/src/gen/thrift/gen-cpp/ThriftHive.cpp =================================================================== --- service/src/gen/thrift/gen-cpp/ThriftHive.cpp (revision 1068272) +++ service/src/gen/thrift/gen-cpp/ThriftHive.cpp (working copy) @@ -1391,6 +1391,129 @@ return xfer; } +uint32_t ThriftHive_clean_args::read(::apache::thrift::protocol::TProtocol* iprot) { + + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t ThriftHive_clean_args::write(::apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + xfer += oprot->writeStructBegin("ThriftHive_clean_args"); + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +uint32_t ThriftHive_clean_pargs::write(::apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + xfer += oprot->writeStructBegin("ThriftHive_clean_pargs"); + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +uint32_t ThriftHive_clean_result::read(::apache::thrift::protocol::TProtocol* iprot) { + + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t ThriftHive_clean_result::write(::apache::thrift::protocol::TProtocol* oprot) const { + + uint32_t xfer = 0; + + xfer += oprot->writeStructBegin("ThriftHive_clean_result"); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +uint32_t ThriftHive_clean_presult::read(::apache::thrift::protocol::TProtocol* iprot) { + + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + void ThriftHiveClient::execute(const std::string& query) { send_execute(query); @@ -1884,6 +2007,60 @@ throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "getQueryPlan failed: unknown result"); } +void ThriftHiveClient::clean() +{ + send_clean(); + recv_clean(); +} + +void ThriftHiveClient::send_clean() +{ + int32_t cseqid = 0; + oprot_->writeMessageBegin("clean", ::apache::thrift::protocol::T_CALL, cseqid); + + ThriftHive_clean_pargs args; + args.write(oprot_); + + oprot_->writeMessageEnd(); + oprot_->getTransport()->flush(); + oprot_->getTransport()->writeEnd(); +} + +void ThriftHiveClient::recv_clean() +{ + + int32_t rseqid = 0; + std::string fname; + ::apache::thrift::protocol::TMessageType mtype; + + iprot_->readMessageBegin(fname, mtype, rseqid); + if (mtype == ::apache::thrift::protocol::T_EXCEPTION) { + ::apache::thrift::TApplicationException x; + x.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + throw x; + } + if (mtype != ::apache::thrift::protocol::T_REPLY) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::INVALID_MESSAGE_TYPE); + } + if (fname.compare("clean") != 0) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::WRONG_METHOD_NAME); + } + ThriftHive_clean_presult result; + result.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + + return; +} + bool ThriftHiveProcessor::process(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot) { ::apache::thrift::protocol::TProtocol* iprot = piprot.get(); @@ -2167,5 +2344,32 @@ oprot->getTransport()->writeEnd(); } +void ThriftHiveProcessor::process_clean(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot) +{ + ThriftHive_clean_args args; + args.read(iprot); + iprot->readMessageEnd(); + iprot->getTransport()->readEnd(); + + ThriftHive_clean_result result; + try { + iface_->clean(); + } catch (const std::exception& e) { + ::apache::thrift::TApplicationException x(e.what()); + oprot->writeMessageBegin("clean", ::apache::thrift::protocol::T_EXCEPTION, seqid); + x.write(oprot); + oprot->writeMessageEnd(); + oprot->getTransport()->flush(); + oprot->getTransport()->writeEnd(); + return; + } + + oprot->writeMessageBegin("clean", ::apache::thrift::protocol::T_REPLY, seqid); + result.write(oprot); + oprot->writeMessageEnd(); + oprot->getTransport()->flush(); + oprot->getTransport()->writeEnd(); +} + }}} // namespace Index: service/src/gen/thrift/gen-rb/thrift_hive.rb =================================================================== --- service/src/gen/thrift/gen-rb/thrift_hive.rb (revision 1068272) +++ service/src/gen/thrift/gen-rb/thrift_hive.rb (working copy) @@ -139,6 +139,20 @@ raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'getQueryPlan failed: unknown result') end + def clean() + send_clean() + recv_clean() + end + + def send_clean() + send_message('clean', Clean_args) + end + + def recv_clean() + result = receive_message(Clean_result) + return + end + end class Processor < ThriftHiveMetastore::Processor @@ -232,6 +246,13 @@ write_result(result, oprot, 'getQueryPlan', seqid) end + def process_clean(seqid, iprot, oprot) + args = read_args(iprot, Clean_args) + result = Clean_result.new() + @handler.clean() + write_result(result, oprot, 'clean', seqid) + end + end # HELPER FUNCTIONS AND STRUCTURES @@ -500,5 +521,35 @@ ::Thrift::Struct.generate_accessors self end + class Clean_args + include ::Thrift::Struct, ::Thrift::Struct_Union + + FIELDS = { + + } + + def struct_fields; FIELDS; end + + def validate + end + + ::Thrift::Struct.generate_accessors self + end + + class Clean_result + include ::Thrift::Struct, ::Thrift::Struct_Union + + FIELDS = { + + } + + def struct_fields; FIELDS; end + + def validate + end + + ::Thrift::Struct.generate_accessors self + end + end Index: service/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/service/ThriftHive.java =================================================================== --- service/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/service/ThriftHive.java (revision 1068272) +++ service/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/service/ThriftHive.java (working copy) @@ -46,6 +46,8 @@ public org.apache.hadoop.hive.ql.plan.api.QueryPlan getQueryPlan() throws HiveServerException, TException; + public void clean() throws TException; + } public interface AsyncIface extends org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore .AsyncIface { @@ -66,6 +68,8 @@ public void getQueryPlan(AsyncMethodCallback resultHandler) throws TException; + public void clean(AsyncMethodCallback resultHandler) throws TException; + } public static class Client extends org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Client implements TServiceClient, Iface { @@ -392,6 +396,38 @@ throw new TApplicationException(TApplicationException.MISSING_RESULT, "getQueryPlan failed: unknown result"); } + public void clean() throws TException + { + send_clean(); + recv_clean(); + } + + public void send_clean() throws TException + { + oprot_.writeMessageBegin(new TMessage("clean", TMessageType.CALL, ++seqid_)); + clean_args args = new clean_args(); + args.write(oprot_); + oprot_.writeMessageEnd(); + oprot_.getTransport().flush(); + } + + public void recv_clean() throws TException + { + TMessage msg = iprot_.readMessageBegin(); + if (msg.type == TMessageType.EXCEPTION) { + TApplicationException x = TApplicationException.read(iprot_); + iprot_.readMessageEnd(); + throw x; + } + if (msg.seqid != seqid_) { + throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, "clean failed: out of sequence response"); + } + clean_result result = new clean_result(); + result.read(iprot_); + iprot_.readMessageEnd(); + return; + } + } public static class AsyncClient extends org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.AsyncClient implements AsyncIface { public static class Factory implements TAsyncClientFactory { @@ -640,6 +676,34 @@ } } + public void clean(AsyncMethodCallback resultHandler) throws TException { + checkReady(); + clean_call method_call = new clean_call(resultHandler, this, protocolFactory, transport); + manager.call(method_call); + } + + public static class clean_call extends TAsyncMethodCall { + public clean_call(AsyncMethodCallback resultHandler, TAsyncClient client, TProtocolFactory protocolFactory, TNonblockingTransport transport) throws TException { + super(client, protocolFactory, transport, resultHandler, false); + } + + public void write_args(TProtocol prot) throws TException { + prot.writeMessageBegin(new TMessage("clean", TMessageType.CALL, 0)); + clean_args args = new clean_args(); + args.write(prot); + prot.writeMessageEnd(); + } + + public void getResult() throws TException { + if (getState() != State.RESPONSE_READ) { + throw new IllegalStateException("Method call not finished!"); + } + TMemoryInputTransport memoryTransport = new TMemoryInputTransport(getFrameBuffer().array()); + TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport); + (new Client(prot)).recv_clean(); + } + } + } public static class Processor extends org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Processor implements TProcessor { @@ -656,6 +720,7 @@ processMap_.put("getThriftSchema", new getThriftSchema()); processMap_.put("getClusterStatus", new getClusterStatus()); processMap_.put("getQueryPlan", new getQueryPlan()); + processMap_.put("clean", new clean()); } private Iface iface_; @@ -982,6 +1047,32 @@ } + private class clean implements ProcessFunction { + public void process(int seqid, TProtocol iprot, TProtocol oprot) throws TException + { + clean_args args = new clean_args(); + try { + args.read(iprot); + } catch (TProtocolException e) { + iprot.readMessageEnd(); + TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage()); + oprot.writeMessageBegin(new TMessage("clean", TMessageType.EXCEPTION, seqid)); + x.write(oprot); + oprot.writeMessageEnd(); + oprot.getTransport().flush(); + return; + } + iprot.readMessageEnd(); + clean_result result = new clean_result(); + iface_.clean(); + oprot.writeMessageBegin(new TMessage("clean", TMessageType.REPLY, seqid)); + result.write(oprot); + oprot.writeMessageEnd(); + oprot.getTransport().flush(); + } + + } + } public static class execute_args implements TBase, java.io.Serializable, Cloneable { @@ -5553,4 +5644,371 @@ } + public static class clean_args implements TBase, java.io.Serializable, Cloneable { + private static final TStruct STRUCT_DESC = new TStruct("clean_args"); + + + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements TFieldIdEnum { +; + + private static final Map byName = new HashMap(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + public static final Map<_Fields, FieldMetaData> metaDataMap; + static { + Map<_Fields, FieldMetaData> tmpMap = new EnumMap<_Fields, FieldMetaData>(_Fields.class); + metaDataMap = Collections.unmodifiableMap(tmpMap); + FieldMetaData.addStructMetaDataMap(clean_args.class, metaDataMap); + } + + public clean_args() { + } + + /** + * Performs a deep copy on other. + */ + public clean_args(clean_args other) { + } + + public clean_args deepCopy() { + return new clean_args(this); + } + + @Override + public void clear() { + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been asigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof clean_args) + return this.equals((clean_args)that); + return false; + } + + public boolean equals(clean_args that) { + if (that == null) + return false; + + return true; + } + + @Override + public int hashCode() { + return 0; + } + + public int compareTo(clean_args other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + clean_args typedOther = (clean_args)other; + + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(TProtocol iprot) throws TException { + TField field; + iprot.readStructBegin(); + while (true) + { + field = iprot.readFieldBegin(); + if (field.type == TType.STOP) { + break; + } + switch (field.id) { + default: + TProtocolUtil.skip(iprot, field.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + validate(); + } + + public void write(TProtocol oprot) throws TException { + validate(); + + oprot.writeStructBegin(STRUCT_DESC); + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("clean_args("); + boolean first = true; + + sb.append(")"); + return sb.toString(); + } + + public void validate() throws TException { + // check for required fields + } + + } + + public static class clean_result implements TBase, java.io.Serializable, Cloneable { + private static final TStruct STRUCT_DESC = new TStruct("clean_result"); + + + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements TFieldIdEnum { +; + + private static final Map byName = new HashMap(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + public static final Map<_Fields, FieldMetaData> metaDataMap; + static { + Map<_Fields, FieldMetaData> tmpMap = new EnumMap<_Fields, FieldMetaData>(_Fields.class); + metaDataMap = Collections.unmodifiableMap(tmpMap); + FieldMetaData.addStructMetaDataMap(clean_result.class, metaDataMap); + } + + public clean_result() { + } + + /** + * Performs a deep copy on other. + */ + public clean_result(clean_result other) { + } + + public clean_result deepCopy() { + return new clean_result(this); + } + + @Override + public void clear() { + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been asigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof clean_result) + return this.equals((clean_result)that); + return false; + } + + public boolean equals(clean_result that) { + if (that == null) + return false; + + return true; + } + + @Override + public int hashCode() { + return 0; + } + + public int compareTo(clean_result other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + clean_result typedOther = (clean_result)other; + + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(TProtocol iprot) throws TException { + TField field; + iprot.readStructBegin(); + while (true) + { + field = iprot.readFieldBegin(); + if (field.type == TType.STOP) { + break; + } + switch (field.id) { + default: + TProtocolUtil.skip(iprot, field.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + validate(); + } + + public void write(TProtocol oprot) throws TException { + oprot.writeStructBegin(STRUCT_DESC); + + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("clean_result("); + boolean first = true; + + sb.append(")"); + return sb.toString(); + } + + public void validate() throws TException { + // check for required fields + } + + } + } Index: service/src/gen/thrift/gen-php/hive_service/ThriftHive.php =================================================================== --- service/src/gen/thrift/gen-php/hive_service/ThriftHive.php (revision 1068272) +++ service/src/gen/thrift/gen-php/hive_service/ThriftHive.php (working copy) @@ -18,6 +18,7 @@ public function getThriftSchema(); public function getClusterStatus(); public function getQueryPlan(); + public function clean(); } class ThriftHiveClient extends ThriftHiveMetastoreClient implements ThriftHiveIf { @@ -448,6 +449,53 @@ throw new Exception("getQueryPlan failed: unknown result"); } + public function clean() + { + $this->send_clean(); + $this->recv_clean(); + } + + public function send_clean() + { + $args = new ThriftHive_clean_args(); + $bin_accel = ($this->output_ instanceof TProtocol::$TBINARYPROTOCOLACCELERATED) && function_exists('thrift_protocol_write_binary'); + if ($bin_accel) + { + thrift_protocol_write_binary($this->output_, 'clean', TMessageType::CALL, $args, $this->seqid_, $this->output_->isStrictWrite()); + } + else + { + $this->output_->writeMessageBegin('clean', TMessageType::CALL, $this->seqid_); + $args->write($this->output_); + $this->output_->writeMessageEnd(); + $this->output_->getTransport()->flush(); + } + } + + public function recv_clean() + { + $bin_accel = ($this->input_ instanceof TProtocol::$TBINARYPROTOCOLACCELERATED) && function_exists('thrift_protocol_read_binary'); + if ($bin_accel) $result = thrift_protocol_read_binary($this->input_, 'ThriftHive_clean_result', $this->input_->isStrictRead()); + else + { + $rseqid = 0; + $fname = null; + $mtype = 0; + + $this->input_->readMessageBegin($fname, $mtype, $rseqid); + if ($mtype == TMessageType::EXCEPTION) { + $x = new TApplicationException(); + $x->read($this->input_); + $this->input_->readMessageEnd(); + throw $x; + } + $result = new ThriftHive_clean_result(); + $result->read($this->input_); + $this->input_->readMessageEnd(); + } + return; + } + } // HELPER FUNCTIONS AND STRUCTURES @@ -1700,4 +1748,104 @@ } +class ThriftHive_clean_args { + static $_TSPEC; + + + public function __construct() { + if (!isset(self::$_TSPEC)) { + self::$_TSPEC = array( + ); + } + } + + public function getName() { + return 'ThriftHive_clean_args'; + } + + public function read($input) + { + $xfer = 0; + $fname = null; + $ftype = 0; + $fid = 0; + $xfer += $input->readStructBegin($fname); + while (true) + { + $xfer += $input->readFieldBegin($fname, $ftype, $fid); + if ($ftype == TType::STOP) { + break; + } + switch ($fid) + { + default: + $xfer += $input->skip($ftype); + break; + } + $xfer += $input->readFieldEnd(); + } + $xfer += $input->readStructEnd(); + return $xfer; + } + + public function write($output) { + $xfer = 0; + $xfer += $output->writeStructBegin('ThriftHive_clean_args'); + $xfer += $output->writeFieldStop(); + $xfer += $output->writeStructEnd(); + return $xfer; + } + +} + +class ThriftHive_clean_result { + static $_TSPEC; + + + public function __construct() { + if (!isset(self::$_TSPEC)) { + self::$_TSPEC = array( + ); + } + } + + public function getName() { + return 'ThriftHive_clean_result'; + } + + public function read($input) + { + $xfer = 0; + $fname = null; + $ftype = 0; + $fid = 0; + $xfer += $input->readStructBegin($fname); + while (true) + { + $xfer += $input->readFieldBegin($fname, $ftype, $fid); + if ($ftype == TType::STOP) { + break; + } + switch ($fid) + { + default: + $xfer += $input->skip($ftype); + break; + } + $xfer += $input->readFieldEnd(); + } + $xfer += $input->readStructEnd(); + return $xfer; + } + + public function write($output) { + $xfer = 0; + $xfer += $output->writeStructBegin('ThriftHive_clean_result'); + $xfer += $output->writeFieldStop(); + $xfer += $output->writeStructEnd(); + return $xfer; + } + +} + ?> Index: service/if/hive_service.thrift =================================================================== --- service/if/hive_service.thrift (revision 1068272) +++ service/if/hive_service.thrift (working copy) @@ -81,4 +81,6 @@ # Get the queryplan annotated with counter information queryplan.QueryPlan getQueryPlan() throws(1:HiveServerException ex) + # clean up last Hive query (releasing locks etc.) + void clean() } Index: cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java =================================================================== --- cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java (revision 1068272) +++ cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java (working copy) @@ -27,15 +27,14 @@ import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Arrays; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import jline.Completor; import jline.ArgumentCompletor; +import jline.ArgumentCompletor.AbstractArgumentDelimiter; import jline.ArgumentCompletor.ArgumentDelimiter; -import jline.ArgumentCompletor.AbstractArgumentDelimiter; +import jline.Completor; import jline.ConsoleReader; import jline.History; import jline.SimpleCompletor; @@ -45,18 +44,21 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.Driver; -import org.apache.hadoop.hive.ql.parse.ParseDriver; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter; +import org.apache.hadoop.hive.ql.parse.ParseDriver; import org.apache.hadoop.hive.ql.processors.CommandProcessor; import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.service.HiveClient; +import org.apache.hadoop.hive.service.HiveServerException; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.thrift.TException; /** * CliDriver. @@ -64,8 +66,9 @@ */ public class CliDriver { - public static final String prompt = "hive"; - public static final String prompt2 = " "; // when ';' is not yet seen + public static String prompt = "hive"; + public static String prompt2 = " "; // when ';' is not yet seen + public static final int LINES_TO_FETCH = 40; // number of lines to fetch in batch from remote hive server public static final String HIVERCFILE = ".hiverc"; @@ -80,7 +83,7 @@ } public int processCmd(String cmd) { - SessionState ss = SessionState.get(); + CliSessionState ss = (CliSessionState) SessionState.get(); String cmd_trimmed = cmd.trim(); String[] tokens = cmd_trimmed.split("\\s+"); @@ -149,8 +152,50 @@ ss.out.println(StringUtils.join(s, "\n")); } } + } else if (ss.isRemoteMode()) { // remote mode -- connecting to remote hive server + HiveClient client = ss.getClient(); + PrintStream out = ss.out; + PrintStream err = ss.err; - } else { + try { + client.execute(cmd_trimmed); + List results; + do { + results = client.fetchN(LINES_TO_FETCH); + for (String line: results) { + out.println(line); + } + } while (results.size() == LINES_TO_FETCH); + } catch (HiveServerException e) { + ret = e.getErrorCode(); + if (ret != 0) { // OK if ret == 0 -- reached the EOF + String errMsg = e.getMessage(); + if (errMsg == null) { + errMsg = e.toString(); + } + ret = e.getErrorCode(); + err.println("[Hive Error]: " + errMsg); + } + } catch (TException e) { + String errMsg = e.getMessage(); + if (errMsg == null) { + errMsg = e.toString(); + } + ret = -10002; + err.println("[Thrift Error]: " + errMsg); + } finally { + try { + client.clean(); + } catch (TException e) { + String errMsg = e.getMessage(); + if (errMsg == null) { + errMsg = e.toString(); + } + err.println("[Thrift Error]: Hive server is not cleaned due to thrift exception: " + + errMsg); + } + } + } else { // local mode CommandProcessor proc = CommandProcessorFactory.get(tokens[0], (HiveConf)conf); if (proc != null) { if (proc instanceof Driver) { @@ -165,7 +210,7 @@ } ArrayList res = new ArrayList(); - + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_CLI_PRINT_HEADER)) { // Print the column names boolean first_col = true; @@ -326,6 +371,7 @@ // as a keyword delimiter, we need to define a new ArgumentDelimiter // that recognizes parenthesis as a delimiter. ArgumentDelimiter delim = new AbstractArgumentDelimiter () { + @Override public boolean isDelimiterChar (String buffer, int pos) { char c = buffer.charAt(pos); return (Character.isWhitespace(c) || c == '(' || c == ')' || @@ -361,7 +407,7 @@ return ret; } }; - + return completor; } @@ -410,6 +456,17 @@ SessionState.start(ss); + // connect to Hive Server + if (ss.getHost() != null) { + ss.connect(); + if (ss.isRemoteMode()) { + prompt = "[" + ss.host + ':' + ss.port + "] " + prompt; + char[] spaces = new char[prompt.length()]; + Arrays.fill(spaces, ' '); + prompt2 = new String(spaces); + } + } + CliDriver cli = new CliDriver(); // Execute -i init files (always in silent mode) @@ -457,6 +514,8 @@ } } + ss.close(); + System.exit(ret); } Index: cli/src/java/org/apache/hadoop/hive/cli/OptionsProcessor.java =================================================================== --- cli/src/java/org/apache/hadoop/hive/cli/OptionsProcessor.java (revision 1068272) +++ cli/src/java/org/apache/hadoop/hive/cli/OptionsProcessor.java (working copy) @@ -39,7 +39,7 @@ /** * OptionsProcessor. - * + * */ public class OptionsProcessor { @@ -47,7 +47,7 @@ private final Parser parser = new Parser(); private final Option confOptions, initFilesOption, isSilentOption, - execOption, fileOption, isHelpOption; + execOption, fileOption, isHelpOption, hostOption, portOption; /** * Shamelessly cloned from Hadoop streaming take in multiple -hiveconf x=y parameters. @@ -151,7 +151,7 @@ isSilentOption = createBoolOption(builder, "silent", "S", "silent mode"); // -help - isHelpOption = createBoolOption(builder, "help", "h", "help"); + isHelpOption = createBoolOption(builder, "help", "H", "help"); // -hiveconf var=val confOptions = new MultiPropertyOption("-hiveconf", @@ -160,11 +160,22 @@ initFilesOption = new MultiPropertyOption( "-i", "File to run before other commands", 'I', false); + // -h + hostOption = createOptionWithArg(builder, "host", "h", + "connecting to Hive Server on host", + argBuilder.withMinimum(1).withMaximum(1).create()); + + // -p + portOption = createOptionWithArg(builder, "port", "p", + "connecting to Hive Server on port", + argBuilder.withMinimum(1).withMaximum(1).create()); + new PropertyOption(); Group allOptions = new GroupBuilder().withOption(confOptions).withOption(initFilesOption) .withOption(isSilentOption).withOption(isHelpOption) - .withOption(execOption).withOption(fileOption).create(); + .withOption(execOption).withOption(fileOption).withOption(hostOption) + .withOption(portOption).create(); parser.setGroup(allOptions); } @@ -201,11 +212,16 @@ if (null != initFiles) { ss.initFiles = initFiles; } - // -h + // -H if (cmdLine.hasOption(isHelpOption)) { printUsage(null); return false; } + + // -h, -p + ss.host = (String) cmdLine.getValue(hostOption); + ss.port = Integer.parseInt((String) cmdLine.getValue(portOption, "10000")); + if (ss.execString != null && ss.fileName != null) { printUsage("-e and -f option cannot be specified simultaneously"); return false; @@ -235,9 +251,14 @@ System.err.println(" -e 'quoted query string' Sql from command line"); System.err.println(" -f Sql from files"); System.err.println(" -S Silent mode in interactive shell"); + System.err.println(" -h host name that running Hive server (remote mode only)"); + System.err.println(" -p port number that Hive server is listening to, default is 10000 (remote mode only)"); + System.err.println(" -H this help message"); System.err.println(""); System.err.println("-e and -f cannot be specified together. In the absence of these"); System.err.println(" options, interactive shell is started."); + System.err.println(" By default Hive CLI is running in local mode, if -h is specified it will be"); + System.err.println(" running in remote mode, which try to connect to the Hive server on remote host."); System.err.println(""); } Index: cli/src/java/org/apache/hadoop/hive/cli/CliSessionState.java =================================================================== --- cli/src/java/org/apache/hadoop/hive/cli/CliSessionState.java (revision 1068272) +++ cli/src/java/org/apache/hadoop/hive/cli/CliSessionState.java (working copy) @@ -24,10 +24,17 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.service.HiveClient; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; /** * CliSessionState. - * + * */ public class CliSessionState extends SessionState { /** @@ -50,12 +57,65 @@ */ public List initFiles = new ArrayList(); + /** + * host name and port number of remote Hive server + */ + protected String host; + protected int port; + + private boolean remoteMode; + + private TTransport transport; + private HiveClient client; + public CliSessionState() { super(); + remoteMode = false; } public CliSessionState(HiveConf conf) { super(conf); + remoteMode = false; } + /** + * Connect to Hive Server + */ + public void connect() throws TTransportException { + transport = new TSocket(host, port); + TProtocol protocol = new TBinaryProtocol(transport); + client = new HiveClient(protocol); + transport.open(); + remoteMode = true; + } + + public void setHost(String host) { + this.host = host; + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public void close() { + try { + client.clean(); + client.shutdown(); + transport.close(); + } catch (TException e) { + e.printStackTrace(); + } + } + + public boolean isRemoteMode() { + return remoteMode; + } + + public HiveClient getClient() { + return client; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (revision 1068272) +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (working copy) @@ -72,6 +72,7 @@ * HiveHistory Object */ protected HiveHistory hiveHist; + /** * Streams to read/write from. */ @@ -80,16 +81,22 @@ public PrintStream err; /** + * Temporary file name used to store results of non-Hive commands (e.g., set, dfs) + * and HiveServer.fetch*() function will read results from this file + */ + protected File tmpOutputFile; + + /** * type of the command. */ private HiveOperation commandType; - + private HiveAuthorizationProvider authorizer; - + private HiveAuthenticationProvider authenticator; - + private CreateTableAutomaticGrant createTableGrants; - + /** * Lineage state. */ @@ -112,6 +119,14 @@ this.conf = conf; } + public File getTmpOutputFile() { + return tmpOutputFile; + } + + public void setTmpOutputFile(File f) { + tmpOutputFile = f; + } + public boolean getIsSilent() { if(conf != null) { return conf.getBoolVar(HiveConf.ConfVars.HIVESESSIONSILENT); @@ -161,29 +176,22 @@ /** * start a new session and set it to current session. - * @throws HiveException */ - public static SessionState start(HiveConf conf) throws HiveException { + public static SessionState start(HiveConf conf) { SessionState ss = new SessionState(conf); - ss.getConf().setVar(HiveConf.ConfVars.HIVESESSIONID, makeSessionId()); - ss.hiveHist = new HiveHistory(ss); - ss.authenticator = HiveUtils.getAuthenticator(conf); - ss.authorizer = HiveUtils.getAuthorizeProviderManager( - conf, ss.authenticator); - ss.createTableGrants = CreateTableAutomaticGrant.create(conf); - tss.set(ss); - return (ss); + return start(ss); } /** * set current session to existing session object if a thread is running * multiple sessions - it must call this method with the new session object * when switching from one session to another. - * @throws HiveException + * @throws HiveException */ public static SessionState start(SessionState startSs) { tss.set(startSs); + if (StringUtils.isEmpty(startSs.getConf().getVar( HiveConf.ConfVars.HIVESESSIONID))) { startSs.getConf() @@ -193,7 +201,21 @@ if (startSs.hiveHist == null) { startSs.hiveHist = new HiveHistory(startSs); } - + + if (startSs.getTmpOutputFile() == null) { + // per-session temp file containing results to be sent from HiveServer to HiveClient + File tmpDir = new File( + HiveConf.getVar(startSs.getConf(), HiveConf.ConfVars.HIVEHISTORYFILELOC)); + String sessionID = startSs.getConf().getVar(HiveConf.ConfVars.HIVESESSIONID); + try { + File tmpFile = File.createTempFile(sessionID, ".pipeout", tmpDir); + tmpFile.deleteOnExit(); + startSs.setTmpOutputFile(tmpFile); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + try { startSs.authenticator = HiveUtils.getAuthenticator(startSs .getConf()); @@ -204,7 +226,7 @@ } catch (HiveException e) { throw new RuntimeException(e); } - + return startSs; } @@ -573,7 +595,7 @@ } return commandType.getOperationName(); } - + public HiveOperation getHiveOperation() { return commandType; } @@ -581,7 +603,7 @@ public void setCommandType(HiveOperation commandType) { this.commandType = commandType; } - + public HiveAuthorizationProvider getAuthorizer() { return authorizer; } @@ -597,7 +619,7 @@ public void setAuthenticator(HiveAuthenticationProvider authenticator) { this.authenticator = authenticator; } - + public CreateTableAutomaticGrant getCreateTableGrants() { return createTableGrants; }