Index: bin/ext/hivehaserver.sh =================================================================== --- bin/ext/hivehaserver.sh (revision 0) +++ bin/ext/hivehaserver.sh (revision 0) @@ -0,0 +1,39 @@ + +THISSERVICE=hivehaserver +export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} " + +hivehaserver() { + echo "Starting Hive Thrift Server" + CLASS=org.apache.hadoop.hive.service.HaHiveServer + if $cygwin; then + HIVE_LIB=`cygpath -w "$HIVE_LIB"` + fi + JAR=${HIVE_LIB}/hive-service-*.jar + + version=$($HADOOP version | awk '{if (NR == 1) {print $2;}}'); + + # Save the regex to a var to workaround quoting incompatabilities + # between Bash 3.1 and 3.2 + version_re="^([[:digit:]]+)\.([[:digit:]]+)(\.([[:digit:]]+))?.*$" + + if [[ "$version" =~ $version_re ]]; then + major_ver=${BASH_REMATCH[1]} + minor_ver=${BASH_REMATCH[2]} + patch_ver=${BASH_REMATCH[4]} + else + echo "" + fi + +# if [ $minor_ver -lt 20 ]; then +# exec ${$HADOOP} start jar $AUX_JARS_CMD_LINE $JAR $CLASS $HIVE_OPTS "$@" +# else + # hadoop 20 or newer - skip the aux_jars option and hiveconf + exec $HADOOP jar $JAR $CLASS $HIVE_OPTS "$@" + # fi +} + +hiveserver_help() { + echo "usage HIVE_PORT=xxxx ./hive --service hiveserver" + echo " HIVE_PORT : Specify the server port" +} + Index: common/src/java/conf/hive-log4j.properties =================================================================== --- common/src/java/conf/hive-log4j.properties (revision 1151733) +++ common/src/java/conf/hive-log4j.properties (working copy) @@ -44,6 +44,28 @@ log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n + +# +# Rolling File Appender - HA Framework +# +log4j.logger.haframework=INFO, HA +log4j.logger.org.I0Itec.zkclient=INFO, HA +log4j.logger.org.apache.zookeeper=INFO, HA + +log4j.appender.HA=org.apache.log4j.RollingFileAppender +log4j.appender.HA.File=${hive.log.dir}/HA_${hive.log.file} + +# Logfile size and and 10 backups +log4j.appender.HA.MaxFileSize=5MB +log4j.appender.HA.MaxBackupIndex=100 + +log4j.appender.HA.layout=org.apache.log4j.PatternLayout +# Pattern format: Date LogLevel LoggerName LogMessage +log4j.appender.HA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n +# Debugging Pattern format +log4j.appender.HA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n + + #custom logging levels #log4j.logger.xxx=DEBUG Index: common/src/java/org/apache/hadoop/hive/common/client/ActiveServerConnection.java =================================================================== --- common/src/java/org/apache/hadoop/hive/common/client/ActiveServerConnection.java (revision 0) +++ common/src/java/org/apache/hadoop/hive/common/client/ActiveServerConnection.java (revision 0) @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.client; + +public class ActiveServerConnection +{ + private long version; + + private Object metadata; + + public long getVersion () + { + return version; + } + + public void setVersion () + { + this.version = System.nanoTime (); + } + + public Object getMetadata () + { + return metadata; + } + + public void setMetadata ( Object metadata ) + { + this.metadata = metadata; + } + + // subbu:try to remove this now ? + public void close () + { + // do nothing + } + +} Index: common/src/java/org/apache/hadoop/hive/common/client/HAConnector.java =================================================================== --- common/src/java/org/apache/hadoop/hive/common/client/HAConnector.java (revision 0) +++ common/src/java/org/apache/hadoop/hive/common/client/HAConnector.java (revision 0) @@ -0,0 +1,301 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.client; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public abstract class HAConnector +{ + public static final String ACTIVE_SERVER_FINDER = "ACTIVE_SERVER_FINDER"; + + protected ActiveServerConnection connection; + + private Object masterLock = new Object (); + + private ActiveInstanceFinder activeServerFinder = null; + + private CountDownLatch delayLatch = null; + + final ReadWriteLock readWriteLock = new ReentrantReadWriteLock (); + + private static final Log LOG = LogFactory.getLog ( HAConnector.class ); + + private static final boolean DEBUG_ENABLED = LOG.isDebugEnabled (); + + public abstract void initServerStatusListener ( List < Object > serverAddresses ); + + public abstract void notifyListenerServerAvailable ( ActiveServerConnection connection + +); + + public abstract void notifyListenerServerUnavailable ( ActiveServerConnection connection + +); + + public ActiveServerConnection getServerMetadata () throws ServerUnavailableException + { + if ( null == connection ) + { + throw new ServerUnavailableException ( "Server connection is unavailable" ); + } + + return connection; + } + + public void reportServerUnAvailable ( ActiveServerConnection existingConnection, + +RetryRule rule ) + { + if ( null != existingConnection && null != connection ) + { + if ( existingConnection.getVersion () < connection.getVersion () ) + { + // Already new version of connection is available + return; + } + } + + synchronized ( masterLock ) + { + if ( null == activeServerFinder ) + { + if ( null != connection ) + { + // Making an null check because during startup existing connection may be null + notifyListenerServerUnavailable ( connection ); + } + delayLatch = new CountDownLatch( 1 ); + activeServerFinder = new ActiveInstanceFinder ( this, delayLatch, readWriteLock ); + activeServerFinder.setDaemon(true); + activeServerFinder.start (); + + try + { + delayLatch.await (); + } + catch ( InterruptedException e ) + { + LOG.error ( "Client Thread interrupted", e ); + } + } + } + + if ( rule.getModes ().equals ( RetryModes.WAIT_MODE ) ) + { + Lock readLock = readWriteLock.readLock (); + boolean isLockAcquired = false; + try + { + if ( DEBUG_ENABLED ) + { + LOG.debug ( "Client thread :" + Thread.currentThread ().getId () + + " starting to wait for the server connection for a period of :" + + rule.getAwaitTimeout () + " secs" ); + } + // after waiting for time out period can exit + isLockAcquired = readLock.tryLock ( rule.getAwaitTimeout (), + +TimeUnit.SECONDS ); + } + catch ( InterruptedException e ) + { + LOG.error ( "Retry Thread interrupted", e ); + } + finally + { + if ( isLockAcquired ) + { + readLock.unlock (); + } + } + if ( DEBUG_ENABLED ) + { + LOG.debug ( "Client thread :" + Thread.currentThread ().getId () + + " returning after wait period for the server connection" ); + } + return; + } + else + { + // if non wait mode can exit immediately + return; + } + } + + public abstract ActiveServerConnection connect () throws Exception; + + public void setConnection ( ActiveServerConnection activeConnection ) + { + synchronized ( masterLock ) + { + // server available plugged in + notifyListenerServerAvailable ( activeConnection ); + this.connection = activeConnection; + activeServerFinder = null; + } + } + + public void setActiveConnection ( ActiveServerConnection activeConnection ) + { + synchronized ( masterLock ) + { + // Current connection can also be null. + //if both current and new connections are same then no need to change the connections. + if ( null != connection && activeConnection.getMetadata().equals(connection.getMetadata()) ) + { + // increment the time stamp to notify clients that there's new connection + // after successful connection obtained. + connection.setVersion(); + + // server available notify with old connection. + notifyListenerServerAvailable ( connection ); + + // close the new connection. + activeConnection.close(); + activeServerFinder = null; + return; + } + if(null != connection) + { + // Making an null check because during startup existing connection may be null + // close existing conenction + connection.close (); + } + setConnection(activeConnection); + } + } + + + //This method is currently called only by DFSClientHAConnector + //and JobClientHAConnector, + //It is added as part of defect DC-625 + protected void destroy() + { + ActiveInstanceFinder tempActiveInstanceFinder = activeServerFinder; + if (null != tempActiveInstanceFinder) + { + tempActiveInstanceFinder.stopFinder(); + try { + tempActiveInstanceFinder.join(); + } catch (InterruptedException e) { + //no exception must be thrown here. + } + activeServerFinder = null; + } + } + +} + + + +class ActiveInstanceFinder extends Thread +{ + private final HAConnector haConnector; + + private static final Log LOG = LogFactory.getLog ( ActiveInstanceFinder.class ); + + private CountDownLatch delayLatch; + + private ReadWriteLock readWriteLock; + + private boolean stop; + + ActiveInstanceFinder ( HAConnector haConnector, CountDownLatch delayLatch, + ReadWriteLock readWriteLock ) + { + super(HAConnector.ACTIVE_SERVER_FINDER); + this.haConnector = haConnector; + this.delayLatch = delayLatch; + this.readWriteLock = readWriteLock; + } + + public void stopFinder(){ + this.stop = true; + } + + public void run () + { + this.stop = false; + LOG.info ( "Retry operation started in back ground:"); + ActiveServerConnection connection = null; + Lock writeLock = readWriteLock.writeLock (); + try + { + writeLock.lock (); + + delayLatch.countDown (); + + while ( !stop ) + { + try + { + connection = this.haConnector.connect (); + if ( null != connection ) + { + this.haConnector.setActiveConnection ( connection ); + break; + } + } + catch ( Exception e ) + { + if ( LOG.isDebugEnabled () ) + { + LOG.debug ( "Exception occured while retrying:" + e.getMessage () ,e); + } + } + } + } + finally + { + writeLock.unlock (); + } + LOG.info ( "Retry operation completed in back ground." ); + } +} + + + +class NotifiableLockObject +{ + private boolean isNotified = false; + + /** + * @return + */ + public boolean isNotified () + { + return isNotified; + } + + /** + * @param isNotified + */ + public void setNotified ( boolean isNotified ) + { + this.isNotified = isNotified; + } +} Index: common/src/java/org/apache/hadoop/hive/common/client/RetryModes.java =================================================================== --- common/src/java/org/apache/hadoop/hive/common/client/RetryModes.java (revision 0) +++ common/src/java/org/apache/hadoop/hive/common/client/RetryModes.java (revision 0) @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.client; + +public enum RetryModes +{ + WAIT_MODE , + NON_WAIT_MODE ; +} Index: common/src/java/org/apache/hadoop/hive/common/client/RetryRule.java =================================================================== --- common/src/java/org/apache/hadoop/hive/common/client/RetryRule.java (revision 0) +++ common/src/java/org/apache/hadoop/hive/common/client/RetryRule.java (revision 0) @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.client; + +public class RetryRule +{ + private RetryModes modes; + + private Long awaitTimeout; + + public RetryRule ( RetryModes modesThat, Long awaitTimeoutThat ) + { + this.modes = modesThat; + this.awaitTimeout = awaitTimeoutThat; + } + + /** + * @return RetryModes + */ + public RetryModes getModes () + { + return modes; + } + + /** + * @return awaitTimeOut + */ + public Long getAwaitTimeout () + { + return awaitTimeout; + } +} Index: common/src/java/org/apache/hadoop/hive/common/client/ServerUnavailableException.java =================================================================== --- common/src/java/org/apache/hadoop/hive/common/client/ServerUnavailableException.java (revision 0) +++ common/src/java/org/apache/hadoop/hive/common/client/ServerUnavailableException.java (revision 0) @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.client; + +public class ServerUnavailableException extends Exception +{ + private static final long serialVersionUID = -964356566606864942L; + + public ServerUnavailableException ( String msg ) + { + super ( msg ); + } +} Index: common/src/java/org/apache/hadoop/hive/common/HAThreadGroup.java =================================================================== --- common/src/java/org/apache/hadoop/hive/common/HAThreadGroup.java (revision 0) +++ common/src/java/org/apache/hadoop/hive/common/HAThreadGroup.java (revision 0) @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common; + +public class HAThreadGroup extends ThreadGroup +{ + + private Throwable throwable; + + /** + * @param name + */ + public HAThreadGroup ( String name ) + { + super ( name ); + } + + /* + * (non-Javadoc) + * + * @see java.lang.ThreadGroup#uncaughtException(java.lang.Thread, java.lang.Throwable) + */ + @Override + public void uncaughtException ( Thread t, Throwable e ) + { + this.throwable = e; + + } + + /** + * This method is used to get the received exception from the thread. + * + * @return {@link Throwable} + */ + public Throwable getException () + { + return throwable; + } +} Index: common/src/java/org/apache/hadoop/hive/common/HiveConfUtils.java =================================================================== --- common/src/java/org/apache/hadoop/hive/common/HiveConfUtils.java (revision 0) +++ common/src/java/org/apache/hadoop/hive/common/HiveConfUtils.java (revision 0) @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; + +/** + * For static loading of the HiveConf + */ +public class HiveConfUtils { + + public static HiveConf hiveConf = new HiveConf(Configuration.class); + +} Index: common/src/java/org/apache/hadoop/hive/common/LogUtility.java =================================================================== --- common/src/java/org/apache/hadoop/hive/common/LogUtility.java (revision 0) +++ common/src/java/org/apache/hadoop/hive/common/LogUtility.java (revision 0) @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common; + +import java.net.URL; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.log4j.LogManager; +import org.apache.log4j.PropertyConfigurator; + +public class LogUtility +{ + private static final Log LOG = LogFactory.getLog ( LogUtility.class.getName () ); + + private static final String HIVE_L4J = "hive-log4j.properties"; + + public static void initHiveLog4j () + { + // allow hive log4j to override any normal initialized one + URL hive_l4j = LogUtility.class.getClassLoader ().getResource ( HIVE_L4J ); + if ( hive_l4j == null ) + { + LOG.warn ( "Unable to load " + HIVE_L4J +". The file should be available in classpath. " ); + } + else + { + LogManager.resetConfiguration (); + PropertyConfigurator.configure ( hive_l4j ); + } + } + +} Index: common/src/java/org/apache/hadoop/hive/common/PropertyReader.java =================================================================== --- common/src/java/org/apache/hadoop/hive/common/PropertyReader.java (revision 0) +++ common/src/java/org/apache/hadoop/hive/common/PropertyReader.java (revision 0) @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class PropertyReader +{ + private static final Log LOG = LogFactory.getLog ( PropertyReader.class ); + + public static Properties getProperties ( String propertyFile ) + { + InputStream resourceAsStream = PropertyReader.class.getClassLoader ().getResourceAsStream ( + propertyFile ); + + Properties p = new Properties (); + + if ( null == resourceAsStream ) + { + throw new IllegalArgumentException ( "The property file : " + propertyFile + + " provided is not available." ); + } + + try + { + p.load ( resourceAsStream ); + } + catch ( IOException e ) + { + LOG.error ( "Exception while reading properties from :" + propertyFile ); + } + finally + { + try + { + // null is already handled above. + resourceAsStream.close (); + } + catch ( IOException e ) + { + LOG.error ( "Exception while closing the stream :" ,e); + } + } + return p; + } +} Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1151733) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -457,7 +457,7 @@ HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS("hive.mapper.cannot.span.multiple.partitions", false), HIVE_REWORK_MAPREDWORK("hive.rework.mapredwork", false), HIVE_CONCATENATE_CHECK_INDEX ("hive.exec.concatenate.check.index", true), - ; + HIVEJOBSPATH("hive.jobs.path", "/tmp/" + System.getProperty("user.name") + "/hive-jobs/"); public final String varname; public final String defaultVal; Index: conf/ha-hive-site.xml =================================================================== --- conf/ha-hive-site.xml (revision 0) +++ conf/ha-hive-site.xml (revision 0) @@ -0,0 +1,50 @@ + + + + + + + + + hive.servers + + + The active and standby hive server ip address and port. + Only when a minimum of two servers are configured HA mode will be + considered. + example : jdbc:hive://10.18.52.138:10000/default,jdbc:hive://10.18.52.116:10000/default + + + + + + hive.query.execution.timeout + 0 + + After specified time it will kill the query execution. This is applicable per query. + The value entered here will be considered as milliseconds. + The maximum value is Integer.MAX(2147483647) value. + Value 0 is considered as infinite. + + + + + hive.connection.maxRetries + 3 + + The number of maximum retries clients try to connect to one + hiveserver after which they try to switch over and connect to the other + available hiveserver. Optional Property with default value as 3. + + + + + hive.await.timeout + 120 + + The connection timeout for the active ha hive connection. + The value should be configured in seconds. It is an optional property with default value 120 secs. + + + + Index: conf/ha-hive.properties =================================================================== --- conf/ha-hive.properties (revision 0) +++ conf/ha-hive.properties (revision 0) @@ -0,0 +1,73 @@ +######################################################## +#### Configurations for the HA-Common framework #### +######################################################## + +# The class name of local resource manager to be implemented for the HA switching +# This should implement the interface org.apache.hadoop.hive.service.HiveLRM +# eg: lrm.impl=org.apache.hadoop.hive.service.HiveLRM +lrm.impl=org.apache.hadoop.hive.service.HiveLRM + +#Specifies the Jmx port to initialize the Jmx service, +#that establishes the communication among HA servers. +#Example : 4444 - Should give the valid port number +ha.jmx.port=4444 + +#Specifies the JMX Connector Server Port. [ This is the server port opened by JMXConnectorServer , if not +#configured a Random Port will be considered.] +#Eg : ha.jmx.connector.server.port=9999 +ha.jmx.connector.server.port=9999 + +######################################################## +#### Configurations for the Zookeeper client #### +######################################################## + +# The address in form of the ipaddress and port where the zookeeper is running. +# This should be a valid ipaddress and port. +# This address configured should be same for both active and standby hive servers. +# zookeeper.servers=:,:,: +# eg: zk.address=10.18.52.25:2181,10.18.52.26:2181,10.18.52.27:2181 +zk.address= + +# The value of the zookeeper persisted node . +# This path configured should be same for both active and standby hive servers. +# eg: zk.root.path=/hadoop/ha +zk.root.path=/hadoop/ha + +# Zookeeper client session timeout in milliseconds. Default timeout is 20000 milliseconds. +# Minimum value is 20 seconds. This value is used by the zookeeper cluster to determine when the +# client's session expires. Expirations happens when the cluster does not hear from the +# client within the specified session timeout period (i.e. no heartbeat). At session expiration +# the cluster will delete all ephemeral nodes owned by that session and notifies +# all connected clients. +# Should be tuned according to the network ping time of the ZK cluster and number of ZK nodes in the cluster. +zk.session.timeout=20000 + +# Zookeeper client connection timeout in milliseconds. Default timeout is 60000 milliseconds. +# Minimum value is 60 seconds. Specifies the maximum time that the client waits to establish a +# connection to Zookeeper. +# Should be tuned according to the network ping time of the ZK cluster and number of ZK nodes in the cluster. +zk.connection.timeout=60000 + + + +######################################################## +#### Configurations for the hive server#### +######################################################## +#Specifies the port for network socket communication. +#Network socket communication is required to copy the database +#from active server to standby server. +#Example : 6666 - Should give the valid port number +hive.ha.socket.port=6666 + +#Specifies the slave port to start the standby server. +#Example : 8888 - Should give the valid port number +hive.ha.slave.port=8888 + +#Specifies the IP address of the machine where the hive server is getting started. +#Example :10.10.10.10 - Should give the valid ipaddress +hive.ha.ipaddress= + +#Active HiveServer - While synchronizing active metadata with standby, +#maximum this much time the active metadata DB will not serve any operations. +#Time in seconds +hive.freeze.timeout=120 Index: jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveConnection.java =================================================================== --- jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveConnection.java (revision 1151733) +++ jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveConnection.java (working copy) @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.jdbc; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.service.HiveClient; import org.apache.hadoop.hive.service.HiveInterface; @@ -58,7 +59,14 @@ private SQLWarning warningChain = null; private static final String URI_PREFIX = "jdbc:hive://"; + public static final String HA_HIVE_SITE_XML = "ha-hive-site.xml"; + static { + Configuration config = new Configuration(); + config.addResource(HA_HIVE_SITE_XML); + } + + /** * TODO: - parse uri (use java.net.URI?). */ Index: jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveDriver.java =================================================================== --- jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveDriver.java (revision 1151733) +++ jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveDriver.java (working copy) @@ -101,7 +101,7 @@ } public Connection connect(String url, Properties info) throws SQLException { - return new HiveConnection(url, info); + return new HiveHAConnection().getHiveConnection(url, info); } /** Index: jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveHAConnection.java =================================================================== --- jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveHAConnection.java (revision 0) +++ jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveHAConnection.java (revision 0) @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.jdbc; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.hive.common.client.ActiveServerConnection; +import org.apache.hadoop.hive.common.client.HAConnector; +import org.apache.hadoop.hive.common.client.ServerUnavailableException; +import org.apache.hadoop.hive.jdbc.util.HAClientUtil; + +public class HiveHAConnection +{ + private static final String OPEN_BRACE = "["; + + private static final String CLOSE_BRACE = "]"; + + private static final String HIVE_CONNECTION_IN_NON_HA_MODE = "HiveClient is in Non-HA mode, server url : "; + + private static final String HIVE_CONNECTION_IN_HA_MODE = "HiveClient is in HA mode, server urls : "; + + private boolean isConfigValid = false; + + private Configuration config; + + private long maxRetryTime = 0; + + private static final Log LOG = LogFactory.getLog ( HiveHAConnection.class.getName () ); + + public Connection getHiveConnection ( String url, Properties info ) throws SQLException + { + Connection connection = null; + if ( isHAMode () ) + { + LOG.info ( HIVE_CONNECTION_IN_HA_MODE + OPEN_BRACE + + config.get ( HAClientUtil.HIVE_SERVERS ) + CLOSE_BRACE ); + connection = getHAConnection (); + } + else + { + LOG.info ( HIVE_CONNECTION_IN_NON_HA_MODE + OPEN_BRACE + url + CLOSE_BRACE ); + connection = HAClientUtil.getConnection ( url, info ); + } + return connection; + } + + private boolean isHAMode () + { + if ( false == isConfigValid ) + { + config = HAClientUtil.getConfig (); + isConfigValid = HAClientUtil.isValidHAConfig ( config ); + maxRetryTime = HAClientUtil.getRetryValue ( config, HAClientUtil.HIVE_CON_AWAIT_TIME, + HAClientUtil.HIVE_CON_AWAIT_TIME_DEFAULT ); + + } + return isConfigValid; + } + + // ------------------ GET HA HIVE CONNECTION ---------------------------- + private Connection getHAConnection () throws SQLException + { + String hiveServers = config.get ( HAClientUtil.HIVE_SERVERS ); + HAConnector connector = HiveHAConnector.getInstance ( config ); + ActiveServerConnection activeServerMetadata = null; + try + { + activeServerMetadata = connector.getServerMetadata (); + if ( null != activeServerMetadata ) + { + // GET ACTIVE SERVER CONNECTION + logRetryOperation (); + String url = ( String ) activeServerMetadata.getMetadata (); + long activeServerStartTime = System.currentTimeMillis (); + try + { + return HAClientUtil.getConnection ( url, null ); + } + catch ( SQLException e ) + { + long activeServerEndTime = System.currentTimeMillis (); + long totalTime = ( activeServerEndTime - activeServerStartTime ) / 1000; + if ( totalTime > maxRetryTime ) + { + throw new SQLException ( + "Unable to get HiveConnection in HA-Mode for the following configurations [ha-hive-site.xml - hive.servers] : " + + hiveServers, e ); + } + HAClientUtil.startRetryOperation ( connector, activeServerMetadata, + ( maxRetryTime - totalTime ), hiveServers ); + } + } + } + catch ( ServerUnavailableException e ) + { + logRetryOperation (); + HAClientUtil.startRetryOperation ( connector, activeServerMetadata, maxRetryTime, + hiveServers ); + } + return HAClientUtil.getActualHiveConnection ( connector, hiveServers ); + } + + private void logRetryOperation () + { + LOG.info ( "Trying to get HiveConnection for ACTIVE HiveServer, maximum waiting time : " + + maxRetryTime + " seconds" ); + } +} Index: jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveHAConnector.java =================================================================== --- jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveHAConnector.java (revision 0) +++ jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveHAConnector.java (revision 0) @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.jdbc; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.hive.common.client.ActiveServerConnection; +import org.apache.hadoop.hive.common.client.HAConnector; +import org.apache.hadoop.hive.common.client.RetryRule; +import org.apache.hadoop.hive.common.client.ServerUnavailableException; +import org.apache.hadoop.hive.jdbc.util.HAClientUtil; + +public class HiveHAConnector extends HAConnector +{ + private static final long ONE_SECOND_WAIT = 1000; + + private static final String HIVE_CONNECTION_RETRY_ON = "HiveConnection retry operation failed on URL : "; + + private static final String UNABLE_FIND_THE_ACTIVE_SERVER = "Unable to find the active HiveServer"; + + private static volatile HiveHAConnector instance = null; + + private Queue < String > urlQueue; + + private Configuration configuration; + + private long noOfRetries = 0; + + private static final Log LOG = LogFactory.getLog ( HiveHAConnector.class.getName () ); + + public static HAConnector getInstance ( Configuration configuration ) + { + if ( null == instance ) + { + synchronized ( HiveHAConnector.class ) + { + if ( null == instance ) + { + instance = new HiveHAConnector ( configuration ); + } + } + } + return instance; + } + + private HiveHAConnector ( Configuration configuration ) + { + this.configuration = configuration; + noOfRetries = HAClientUtil.getRetryValue ( configuration, HAClientUtil.HIVE_CON_NO_RETRIES, + HAClientUtil.HIVE_CON_NO_RETRIES_DEFAULT ); + composeHiveUrls (); + } + + public ActiveServerConnection connect () + { + ActiveServerConnection connection = null; + + // RETRY LOGIC + // ------------------------------------------------------------------------- + String serverUrl = urlQueue.peek (); + for ( int retryCount = 0; retryCount < noOfRetries; retryCount++ ) + { + LOG.info ( "HiveConnection is retrying on url : " + serverUrl + ", retryCount : " + + retryCount ); + if ( isHiveServerActive ( serverUrl ) ) + { + connection = new ActiveServerConnection (); + connection.setMetadata ( serverUrl ); + return connection; + } + HAClientUtil.waitBetweenRetries ( ONE_SECOND_WAIT ); + } + serverUrl = urlQueue.poll (); + urlQueue.add ( serverUrl ); + throw new RuntimeException ( UNABLE_FIND_THE_ACTIVE_SERVER ); + } + + private boolean isHiveServerActive ( String url ) + { + Connection conn = null; + try + { + conn = HAClientUtil.getConnection ( url, null ); + return true; + } + catch ( SQLException e ) + { + if ( LOG.isDebugEnabled () ) + { + LOG.debug ( HIVE_CONNECTION_RETRY_ON + url + ", " + e.getMessage () ); + } + } + finally + { + HAClientUtil.closeConnection ( conn ); + } + return false; + } + + private void composeHiveUrls () + { + String [] servers = configuration.getStrings ( HAClientUtil.HIVE_SERVERS ); + int nonEmptyLength = 0; + for ( int nonEmptyUrlCount = 0; nonEmptyUrlCount < servers.length; nonEmptyUrlCount++ ) + { + if ( 0 != servers[nonEmptyUrlCount].trim ().length () ) + { + nonEmptyLength++; + } + } + urlQueue = new ArrayBlockingQueue < String > ( nonEmptyLength ); + for ( int urlCount = 0; urlCount < servers.length; urlCount++ ) + { + if ( 0 != servers[urlCount].trim ().length () ) + { + urlQueue.add ( servers[urlCount] ); + } + } + } + + @Override + public void reportServerUnAvailable ( ActiveServerConnection existingConnection, RetryRule rule ) + { + super.reportServerUnAvailable ( existingConnection, rule ); + } + + @Override + public ActiveServerConnection getServerMetadata () throws ServerUnavailableException + { + return super.getServerMetadata (); + } + + @Override + public void initServerStatusListener ( List < Object > nameNodeAddresses ) + { + // No Implementation + } + + @Override + public void notifyListenerServerAvailable ( ActiveServerConnection connection ) + { + // No Implementation + } + + @Override + public void notifyListenerServerUnavailable ( ActiveServerConnection connection ) + { + // No Implementation + } +} Index: jdbc/src/java/org/apache/hadoop/hive/jdbc/util/HAClientUtil.java =================================================================== --- jdbc/src/java/org/apache/hadoop/hive/jdbc/util/HAClientUtil.java (revision 0) +++ jdbc/src/java/org/apache/hadoop/hive/jdbc/util/HAClientUtil.java (revision 0) @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.jdbc.util; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Properties; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.jdbc.HiveConnection; + +import org.apache.hadoop.hive.common.client.ActiveServerConnection; +import org.apache.hadoop.hive.common.client.HAConnector; +import org.apache.hadoop.hive.common.client.RetryModes; +import org.apache.hadoop.hive.common.client.RetryRule; + +public class HAClientUtil +{ + public static final String HIVE_CON_NO_RETRIES = "hive.connection.maxRetries"; + + public static final long HIVE_CON_NO_RETRIES_DEFAULT = 3; + + public static final String HIVE_CON_AWAIT_TIME = "hive.await.timeout"; + + public static final long HIVE_CON_AWAIT_TIME_DEFAULT = 120; + + public static final String HIVE_SERVERS = "hive.servers"; + + public static final String HA_HIVE_SITE_XML = "ha-hive-site.xml"; + + private static final String INVALID_SERVER_CONFIG = "[Validation : ha-hive-site.xml] - hive.servers property is invalid"; + + private static final Log LOG = LogFactory.getLog ( HAClientUtil.class.getName () ); + + public static boolean isValidHAConfig(Configuration configuration) { + try { + return validateServerConfig(configuration.getStrings(HIVE_SERVERS)); + } catch (IllegalArgumentException exception) { + LOG + .warn("The IP and Port number specified for hive servers " + + getHiveServers(configuration) + + " is invalid " + + exception.getMessage()); + return false; + } + } + + private static String getHiveServers(Configuration configuration) { + return Arrays.toString(configuration.getStrings ( HIVE_SERVERS )); + } + + // ---------------- HIVE SERVERS IP AND PORT VALIDATION ---------------------------- + private static boolean validateServerConfig ( String [] servers ) + { + int validUrlCounter = 0; + if ( ArrayUtils.isEmpty ( servers ) || servers.length < 2 ) + { + throw new IllegalArgumentException ( INVALID_SERVER_CONFIG ); + } + else + { + for ( int urlCount = 0; urlCount < servers.length; urlCount++ ) + { + if ( 0 != servers[urlCount].trim ().length () ) + { + validUrlCounter++; + } + } + if ( validUrlCounter < 2 ) + { + throw new IllegalArgumentException ( INVALID_SERVER_CONFIG ); + } + } + return true; + } + + public static Connection getConnection ( String url, Properties info ) throws SQLException + { + return new HiveConnection ( url, info ); + } + + public static long getRetryValue ( Configuration config, String propertyName, long defaultValue ) + { + Long retryValue = config.getLong ( propertyName, defaultValue ); + if ( retryValue <= 0 ) + { + StringBuffer msg = new StringBuffer (); + msg.append ( "Invalid value is configured for the Property:" ); + msg.append ( propertyName ); + msg.append ( " Value:" ); + msg.append ( retryValue ); + msg.append ( " .So the default value is considered:" + defaultValue ); + LOG.warn ( msg.toString () ); + retryValue = defaultValue; + } + return retryValue; + } + + public static void closeConnection ( Connection conn ) + { + if ( null != conn ) + { + try + { + conn.close (); + } + catch ( SQLException execption ) + { + LOG.error ( execption ); + } + } + } + + public static void waitBetweenRetries ( long time ) + { + try + { + Thread.sleep ( time ); + } + catch ( InterruptedException e ) + { + LOG.error ( e ); + } + } + + public static Configuration getConfig () + { + Configuration config = new Configuration (); + config.addResource ( HA_HIVE_SITE_XML ); + return config; + } + + public static Connection getActualHiveConnection ( HAConnector connector, String hiveServers ) + throws SQLException + { + try + { + ActiveServerConnection newConnection = connector.getServerMetadata (); + if ( null != newConnection ) + { + String metadata = ( String ) newConnection.getMetadata (); + LOG.info ( "Active HiveServer Connection URL : " + metadata ); + return HAClientUtil.getConnection ( metadata, null ); + } + else + { + throw new SQLException ( "HA Connection failed" ); + } + + } + catch ( Exception e ) + { + throw new SQLException ( + "Unable to get HiveConnection in HA-Mode for the following configurations [ha-hive-site.xml - hive.servers] : " + + hiveServers, e ); + } + } + + public static void startRetryOperation ( HAConnector connector, + ActiveServerConnection connection, long maxRetryTime, String hiveServer ) + throws SQLException + { + RetryRule rule = new RetryRule ( RetryModes.WAIT_MODE, maxRetryTime ); + try + { + connector.reportServerUnAvailable ( connection, rule ); + } + catch ( Exception exce ) + { + throw new SQLException ( + "Unable to get HiveConnection in HA-Mode for the following configurations [ha-hive-site.xml - hive.servers] : " + + hiveServer, exce ); + } + } +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/active/ActiveHiveServer.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/active/ActiveHiveServer.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/active/ActiveHiveServer.java (revision 0) @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.active; + +public interface ActiveHiveServer +{ + /** + * @return void + */ + public void startMetadataCopy () throws MetadataException; + + /** + * @param slavePort + * @return void + */ + public void endMetadataCopy ( String slavePort ) throws MetadataException; +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/active/ActiveHiveServerImpl.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/active/ActiveHiveServerImpl.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/active/ActiveHiveServerImpl.java (revision 0) @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.active; + +import java.util.concurrent.CountDownLatch; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.hive.metastore.jmx.MetadataCopyTimer; +import org.apache.hadoop.hive.metastore.util.HiveConfigReader; +import org.apache.hadoop.hive.metastore.util.HiveDerbyUtils; +import org.apache.hadoop.hive.metastore.util.HaCommonUtils; + +public class ActiveHiveServerImpl implements ActiveHiveServer +{ + private static final Log LOG = LogFactory.getLog ( ActiveHiveServerImpl.class.getName () ); + + private MetadataCopyTimer copyTimer = new MetadataCopyTimer (); + + @Override + public void endMetadataCopy ( String replicationCommand ) throws MetadataException + { + try + { + HaCommonUtils.getConnection ( replicationCommand ); + copyTimer.setMetadataCopy ( false ); + LOG.info ( "[ACTIVE SERVER]endMetadataCopy successful, Derby replication started" ); + } + catch ( Throwable e ) + { + LOG.error ( "[ACTIVE SERVER]endMetadataCopy failed", e ); + throw new MetadataException ( e ); + } + } + + @Override + public void startMetadataCopy () throws MetadataException + { + try + { + // --------------------------------------------------- + // TIMEOUT TASK FOR DB COPY + copyTimer.setLatch ( new CountDownLatch ( 1 ) ); + copyTimer.setMetadataCopy ( true ); + copyTimer.performTimeOutTask (); + copyTimer.getLatch ().await (); + // --------------------------------------------------- + + stopActiveReplication (); + HiveDerbyUtils.shutDownDerby (); + LOG.info ( "[ACTIVE SERVER]startMetadataCopy successful" ); + } + catch ( Exception e ) + { + LOG.error ( "[ACTIVE SERVER]startMetadataCopy failed", e ); + throw new MetadataException ( e ); + } + } + + private void stopActiveReplication () + { + try + { + HaCommonUtils.getConnection ( "jdbc:derby:" + + HiveConfigReader.getInstance ().getDbName () + ";stopMaster=true" ); + LOG.info ( "[ACTIVE SERVER]metadata replication stopped" ); + } + catch ( Throwable e ) + { + LOG.warn ( "[ACTIVE SERVER]Failed to stop metadata replication " + e.getMessage () ); + } + } + +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/active/MetadataException.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/active/MetadataException.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/active/MetadataException.java (revision 0) @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.active; + +public class MetadataException extends RuntimeException +{ + + private static final long serialVersionUID = -1385463276037833900L; + + /** + * @param smsgg + */ + public MetadataException ( String msg ) + { + super ( msg ); + } + + /** + * @param e + */ + public MetadataException ( Throwable e ) + { + super ( e ); + } + + /** + * @param e + */ + public MetadataException ( String msg, Throwable e ) + { + super ( msg, e ); + } +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/jmx/HiveJmx.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/jmx/HiveJmx.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/jmx/HiveJmx.java (revision 0) @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.jmx; + +import java.io.IOException; +import java.lang.reflect.Proxy; + +import javax.management.ObjectName; +import javax.management.remote.JMXServiceURL; + +import org.apache.hadoop.hive.metastore.active.ActiveHiveServer; + +public class HiveJmx +{ + public static ActiveHiveServer getProxy ( JMXServiceURL serviceURL, ObjectName objectName ) + throws IOException + { + return ( ActiveHiveServer ) Proxy.newProxyInstance ( ActiveHiveServer.class + .getClassLoader (), new Class [] { ActiveHiveServer.class }, new Invoker ( serviceURL, + objectName ) ); + } +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/jmx/Invocation.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/jmx/Invocation.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/jmx/Invocation.java (revision 0) @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.jmx; + +import java.io.ByteArrayOutputStream; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; +import java.lang.reflect.Method; + +public class Invocation implements Externalizable +{ + + private String methodName; + + private Class < ? > [] parameterClasses; + + private Object [] parameters; + + public Invocation () + { + // For Externalizable + } + + public Invocation ( Method method, Object [] parameters ) + { + this.methodName = method.getName (); + this.parameterClasses = method.getParameterTypes (); + this.parameters = parameters; + } + + /** The name of the method invoked. */ + public String getMethodName () + { + return methodName; + } + + /** The parameter classes. */ + @SuppressWarnings("unchecked") + public Class [] getParameterClasses () + { + return parameterClasses; + } + + /** The parameter instances. */ + public Object [] getParameters () + { + return parameters; + } + + @Override + public void readExternal ( ObjectInput in ) throws IOException, ClassNotFoundException + { + methodName = ( String ) in.readObject (); + parameters = new Object [in.readInt ()]; + parameterClasses = new Class [parameters.length]; + for ( int i = 0; i < parameters.length; i++ ) + { + parameters[i] = in.readObject (); + parameterClasses[i] = parameters[i].getClass (); + } + } + + @Override + public void writeExternal ( ObjectOutput out ) throws IOException + { + out.writeObject ( methodName ); + out.writeInt ( parameterClasses.length ); + for ( int i = 0; i < parameterClasses.length; i++ ) + { + out.writeObject ( parameters[i] ); + } + } + + public byte [] getByteArray() throws Exception + { + ByteArrayOutputStream buf = new ByteArrayOutputStream(1024); + ObjectOutputStream outputStream = new ObjectOutputStream(buf); + try + { + outputStream.writeObject(this); + byte[] byteArray = buf.toByteArray(); + return byteArray; + } + finally + { + if (null!= outputStream) + { + outputStream.close(); + } + } + } +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/jmx/Invoker.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/jmx/Invoker.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/jmx/Invoker.java (revision 0) @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.jmx; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; + +import javax.management.MBeanServerConnection; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.hive.metastore.util.HaCommonUtils; +import org.apache.hadoop.hive.metastore.util.HaConstants; +import org.apache.hadoop.hive.metastore.active.MetadataException; + +public class Invoker implements InvocationHandler +{ + private static final long ONE_SEC = 1000; + + private static final long DEFAULT_WAIT_TIMEOUT = 120; + + private static final Log LOG = LogFactory.getLog ( Invoker.class.getName () ); + + private static long startupTimeout = HaCommonUtils.getNumericPropertyValue ( + HaConstants.ACTINE_HIVE_STARTUP_TIME_OUT, DEFAULT_WAIT_TIMEOUT ) * 1000; + + private final JMXServiceURL serviceUrl; + + private final ObjectName objectName; + + public Invoker ( JMXServiceURL serviceUrl, ObjectName objectName ) + { + this.serviceUrl = serviceUrl; + this.objectName = objectName; + } + + @Override + public Object invoke ( Object proxy, Method method, Object [] args ) throws Throwable + { + String [] invocationSignature = new String [] { byte[].class.getName () }; + Invocation invocation = new Invocation ( method, args ); + + byte[] byteArray = invocation.getByteArray(); + Object [] invocationParams = new Object [] { byteArray }; + JMXConnector connector = null; + MBeanServerConnection mbsc = null; + + // Retry logic happens for all 3 operations + // 1.Get JMX Connector + // 2.Get MBean Connection + // 3.Invoke JMX + + long startTime = System.currentTimeMillis (); + long failTime = 0L; + try + { + while ( failTime < startupTimeout ) + { + try + { + if ( null == connector ) + { + connector = JMXConnectorFactory.connect ( serviceUrl ); + } + if ( null == mbsc ) + { + mbsc = connector.getMBeanServerConnection (); + } + return mbsc.invoke ( objectName, "invoke", invocationParams, invocationSignature ); + } + catch ( Throwable e ) + { + LOG + .warn ( "Unable to connect to Active Server. StandBy Server will retry the operation",e ); + HaCommonUtils.sleepTimeOut ( ONE_SEC ); + failTime = System.currentTimeMillis () - startTime; + } + } + } + finally + { + if (null!= connector) + { + LOG.info("Closing the connector."); + connector.close(); + } + } + throw new MetadataException ( "Unable to connect to Active Server after timeout " + + startupTimeout + " millis." ); + } +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/jmx/JmxInvocationHandler.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/jmx/JmxInvocationHandler.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/jmx/JmxInvocationHandler.java (revision 0) @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.jmx; + +import java.io.ByteArrayInputStream; +import java.io.ObjectInputStream; +import java.lang.reflect.Method; + +public class JmxInvocationHandler implements JmxInvocationHandlerMBean { + + private final Object instance; + + public JmxInvocationHandler(Object instance) { + this.instance = instance; + } + + @Override + public Object invoke(byte [] invocationBytes) throws Exception { + + ByteArrayInputStream buf = new ByteArrayInputStream(invocationBytes); + ObjectInputStream inputStream = new ObjectInputStream(buf); + Invocation invocation = null; + try + { + invocation = (Invocation)inputStream.readObject(); + } + finally + { + if(null != inputStream) + { + inputStream.close(); + } + } + Method method = getMethod(invocation); + return method.invoke(instance, invocation.getParameters()); + } + + private Method getMethod(Invocation invocation) throws SecurityException, + NoSuchMethodException { + return instance.getClass().getMethod(invocation.getMethodName(), + invocation.getParameterClasses()); + } + +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/jmx/JmxInvocationHandlerMBean.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/jmx/JmxInvocationHandlerMBean.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/jmx/JmxInvocationHandlerMBean.java (revision 0) @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.jmx; + +public interface JmxInvocationHandlerMBean +{ + public Object invoke ( byte [] invocation ) throws Exception; + +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/jmx/MetadataCopyTimeOutThread.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/jmx/MetadataCopyTimeOutThread.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/jmx/MetadataCopyTimeOutThread.java (revision 0) @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.jmx; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.hive.metastore.util.HaCommonUtils; +import org.apache.hadoop.hive.metastore.util.HaConstants; + +public class MetadataCopyTimeOutThread extends Thread +{ + public static final long COPY_TIME_VALUE = HaCommonUtils.getNumericPropertyValue ( + HaConstants.COPY_TIME_OUT, HaConstants.COPY_TIME_OUT_DEFAULT ) * 1000; + + private static final int TIMEOUT_INTERVAL = 50; + + private MetadataCopyTimer metadataCopyTimer; + + private static final Log LOG = LogFactory.getLog ( MetadataCopyTimeOutThread.class.getName () ); + + public MetadataCopyTimeOutThread ( String threadName, MetadataCopyTimer metadataCopyTimer ) + { + this.setName ( threadName ); + this.metadataCopyTimer = metadataCopyTimer; + } + + @Override + public void run () + { + try + { + long awaitTime = 0; + HaCommonUtils.lock.writeLock ().lock (); + metadataCopyTimer.getLatch ().countDown (); + while ( metadataCopyTimer.isMetadataCopied () ) + { + + if ( awaitTime < COPY_TIME_VALUE ) + { + HaCommonUtils.sleepTimeOut ( TIMEOUT_INTERVAL ); + awaitTime = awaitTime + TIMEOUT_INTERVAL; + } + else + { + if ( metadataCopyTimer.isMetadataCopied () ) + { + metadataCopyTimer.setMetadataCopy ( false ); + LOG.info ( "Active HiveServer DB is blocked for metadata copy [db block timeout : " + + COPY_TIME_VALUE + " milli seconds]" ); + } + } + } + } + finally + { + HaCommonUtils.lock.writeLock ().unlock (); + } + } +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/jmx/MetadataCopyTimer.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/jmx/MetadataCopyTimer.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/jmx/MetadataCopyTimer.java (revision 0) @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.jmx; + +import java.util.concurrent.CountDownLatch; + +public class MetadataCopyTimer +{ + + private volatile boolean isMetadataCopied = false; + + private CountDownLatch latch; + + + public boolean isMetadataCopied () + { + return isMetadataCopied; + } + + public void setMetadataCopy ( boolean isMetadataCopied ) + { + this.isMetadataCopied = isMetadataCopied; + } + + public void performTimeOutTask () + { + new MetadataCopyTimeOutThread ( "MetadataCopyTimeOutThread", this ).start (); + } + + public CountDownLatch getLatch () + { + return latch; + } + + public void setLatch ( CountDownLatch latch ) + { + this.latch = latch; + } +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/task/HaStandByServerThread.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/task/HaStandByServerThread.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/task/HaStandByServerThread.java (revision 0) @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.task; + +import java.net.UnknownHostException; +import java.sql.Connection; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.hive.metastore.util.HaCommonUtils; +import org.apache.hadoop.hive.metastore.util.HaConstants; +import org.apache.hadoop.hive.metastore.util.HiveConfigReader; + +public class HaStandByServerThread extends Thread +{ + + private static final Log LOG = LogFactory.getLog ( HaStandByServerThread.class.getName () ); + + @Override + public void run () + { + Connection connection = null; + try + { + String standByCommand = constructStandByCmd (); + LOG.info ( "Starting StandBy Server with URL: " + standByCommand ); + connection = HaCommonUtils.getConnection ( standByCommand ); + } + catch ( Exception e ) + { + if ( e.getMessage ().contains ( "Replication slave mode started successfully" ) ) + { + LOG.info ( "[STANDBY server] Replication slave mode started successfully" ); + LOG.info ( "HiveServer started in STANDBY mode" ); + } + else + { + LOG.error ( "Unable to start standBy server ", e ); + } + } + finally + { + HaCommonUtils.closeConnection ( connection ); + } + } + + private String constructStandByCmd () throws UnknownHostException + { + Properties conf = HiveConfigReader.getInstance ().getHAConf (); + StringBuilder builder = new StringBuilder (); + builder.append ( "jdbc:derby:" ); + builder.append ( HiveConfigReader.getInstance ().getDbName () ); + builder.append ( ";startSlave=true;slaveHost=" ); + builder.append ( HaCommonUtils.getLocalIPAddress () ); + builder.append ( ";slavePort=" ); + builder.append ( conf.getProperty ( HaConstants.SLAVE_PORT ) ); + return builder.toString (); + } +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/task/HiveRegistry.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/task/HiveRegistry.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/task/HiveRegistry.java (revision 0) @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.task; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class HiveRegistry +{ + + private final static Map < String, Object > registry = new ConcurrentHashMap < String, Object > ( + 8 ); + + public static Object getObject ( String key ) + { + return registry.get ( key ); + } + + public static void register ( String key, Object object ) + { + registry.put ( key, object ); + } +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/task/HiveStandByServer.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/task/HiveStandByServer.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/task/HiveStandByServer.java (revision 0) @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.task; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.Socket; +import java.util.Map; + +import javax.management.ObjectName; +import javax.management.remote.JMXServiceURL; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.IOUtils; + +import org.apache.hadoop.hive.metastore.jmx.HiveJmx; +import org.apache.hadoop.hive.metastore.util.HaCommonUtils; +import org.apache.hadoop.hive.metastore.util.HaConstants; +import org.apache.hadoop.hive.metastore.util.HiveConfigReader; +import org.apache.hadoop.hive.metastore.util.HiveZipUtils; +import org.apache.hadoop.hive.metastore.active.ActiveHiveServer; +import org.apache.hadoop.hive.metastore.active.MetadataException; + +public class HiveStandByServer +{ + private static final int BYTE_SIZE = 1024; + + private static final int ONE_SECOND = 1000; + + private final int maxWaitTime; + + private static final int DEFAULT_MAX_RETRIES = 5; + + private static final int MAX_WAIT_TIME = 120000; + + private static final Log LOG = LogFactory.getLog ( HiveStandByServer.class.getName () ); + + private HaStandByServerThread haStandByServerThread; + + private HiveConfigReader confReader = HiveConfigReader.getInstance (); + + private boolean isServerRunning = false; + + public HiveStandByServer () + { + this ( MAX_WAIT_TIME ); + } + + HiveStandByServer ( int maxWaitTime ) + { + this.maxWaitTime = maxWaitTime; + } + + public void init ( Map < String, String > activeServerProps ) throws Exception + { + String ipAddress = activeServerProps.get ( HaConstants.IP_ADDRESS ); + String jmxPort = activeServerProps.get ( HaConstants.JMX_PORT ); + + JMXServiceURL serviceURL = new JMXServiceURL ( "service:jmx:rmi:///jndi/rmi://" + ipAddress + + ':' + jmxPort + '/' + HaConstants.JMX_SERVICE_NAME ); + ActiveHiveServer activeHiveServer = HiveJmx.getProxy ( serviceURL, new ObjectName ( + HaConstants.MBEAN_NAME ) ); + + activeHiveServer.startMetadataCopy (); + + // COPY THE METADATA THROUGH SOCKET + readAndUnZipMetadata ( activeServerProps ); + + // START STANDBY IN SLAVE MODE + startStandByHive (); + + // WAIT FOR STANDBY STARTUP IN SLAVE MODE + waitForStanByDBStartUp (); + + // STOP COPY + activeHiveServer.endMetadataCopy ( constructReplicationCmd () ); + + isServerRunning = true; + LOG.info ( "[STANDBY SERVER] Started StandByServer successfully in replication mode" ); + } + + private String constructReplicationCmd () + { + StringBuilder builder = new StringBuilder (); + builder.append ( "jdbc:derby:" ); + builder.append ( confReader.getDbName () ); + builder.append ( ";startMaster=true;slaveHost=" ); + builder.append ( confReader.getHAConf ().getProperty ( HaConstants.IP_ADDRESS ) ); + builder.append ( ";slavePort=" ); + builder.append ( confReader.getHAConf ().getProperty ( HaConstants.SLAVE_PORT ) ); + return builder.toString (); + } + + private void readAndUnZipMetadata ( Map < String, String > activeServerProps ) + { + Socket clientSocket = null; + FileOutputStream fileOutputStream = null; + InputStream socketInputStream = null; + String zipFilePath = null; + try + { + zipFilePath = System.getProperty ( HaConstants.USER_DIR ) + File.separator + + confReader.getDbName () + HaConstants.ZIP_EXTENSION; + String activeServerIP = activeServerProps.get ( HaConstants.IP_ADDRESS ); + long startTime = System.currentTimeMillis (); + while ( null == clientSocket ) + { + int portNo = Integer.parseInt ( activeServerProps.get ( HaConstants.SOCKET_PORT ) ); + try + { + clientSocket = HaCommonUtils.getClientSocket ( activeServerIP, portNo ); + } + catch ( IOException e ) + { + long failTime = System.currentTimeMillis () - startTime; + if ( failTime > maxWaitTime ) + { + LOG.error ( "Connection cannot be obtained to Active Server at " + + activeServerIP + ':' + portNo + " even after " + maxWaitTime + + " milliseconds.", e ); + throw new MetadataException ( e ); + } + else + { + LOG + .warn ( "Retry the server socket connection portno for reading the active server database : " + + portNo ); + } + } + Thread.sleep ( ONE_SECOND ); + } + + socketInputStream = clientSocket.getInputStream (); + fileOutputStream = new FileOutputStream ( zipFilePath ); + IOUtils.copyBytes(socketInputStream, fileOutputStream, BYTE_SIZE, false); + fileOutputStream.flush (); + HiveZipUtils.unZipFile ( zipFilePath, HaCommonUtils.getDataBasePath (), true, true ); + LOG.info ( "Unzipping metadata from Active Server to :" + HaCommonUtils.getDataBasePath () ); + } + catch ( Exception e ) + { + LOG.error ( "Unable to read metadata zipfile " + zipFilePath, e ); + throw new MetadataException ( "exception occured while reading metadata as a zipfile", + e ); + } + finally + { + IOUtils.closeSocket ( clientSocket ); + IOUtils.closeStream ( fileOutputStream ); + IOUtils.closeStream ( socketInputStream ); + } + } + + private void startStandByHive () + { + haStandByServerThread = new HaStandByServerThread (); + haStandByServerThread.start (); + } + + private void waitForStanByDBStartUp () throws InterruptedException + { + String slavePort = confReader.getHAConf ().getProperty ( HaConstants.SLAVE_PORT ); + long maxRetries = HaCommonUtils.getNumericPropertyValue ( + HaConstants.MAX_SLAVE_DB_STARTUP_RETRIES, DEFAULT_MAX_RETRIES ); + long numRetries = maxRetries; + while ( false == HaCommonUtils.isPortAlreadyInUse ( Integer.parseInt ( slavePort ) ) ) + { + if ( LOG.isDebugEnabled () ) + { + LOG + .debug ( "Replication port is not in use for metadata replication, waiting for the port to be opened by standBy server" ); + } + Thread.sleep ( ONE_SECOND ); + if ( numRetries-- == 0 ) + { + throw new MetadataException ( "Slave metadata DB is not started even after " + + ( ONE_SECOND * maxRetries ) + " milliseconds " ); + } + } + } + + public void destroy () + { + if ( null != haStandByServerThread && haStandByServerThread.isAlive () ) + { + haStandByServerThread.interrupt (); + } + } + + public boolean isServerRunning () + { + return isServerRunning; + } +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/task/Keeper.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/task/Keeper.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/task/Keeper.java (revision 0) @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.task; + +import org.apache.hadoop.conf.Configuration; + +public interface Keeper +{ + + public static final String TASKKEEPER = "TASKKEEPER"; + + void record ( String path, Configuration conf ); + + void remove ( String path, Configuration conf ); + + void killAllJobs ( Configuration conf, String jobID ); + +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/task/MetadataSyncTask.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/task/MetadataSyncTask.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/task/MetadataSyncTask.java (revision 0) @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.task; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.IOUtils; + +import org.apache.hadoop.hive.metastore.util.HaCommonUtils; +import org.apache.hadoop.hive.metastore.util.HaConstants; +import org.apache.hadoop.hive.metastore.util.HiveConfigReader; +import org.apache.hadoop.hive.metastore.util.HiveZipUtils; + +public class MetadataSyncTask extends Thread +{ + + private static final int BYTE_SIZE = 1024; + + private ServerSocket serverSocket; + + private static final Log LOG = LogFactory.getLog ( MetadataSyncTask.class.getName () ); + + private volatile boolean alive = true; + + public MetadataSyncTask ( String threadName ) + { + this.setName ( threadName ); + } + + @Override + public void run () + { + compressAndWriteZipFileToSocket (); + } + + private void compressAndWriteZipFileToSocket () + { + Properties properties = HiveConfigReader.getInstance ().getHAConf (); + // -------------------------------------------------------------------- + LOG.info ( "Active Hive server is listening for the standby hive connection " ); + int portNo = Integer.parseInt ( ( String ) properties + .getProperty ( HaConstants.SOCKET_PORT ) ); + try + { + serverSocket = HaCommonUtils.getServerSocket ( portNo ); + } + catch ( IOException e ) + { + LOG.error ( "Error listening to socket " + portNo + + " Replication of metastore_db to Standby Server will not work", e ); + return; + } + + // -------------------------------------------------------------------- + Socket socket = null; + File file = null; + FileInputStream fileInputStream = null; + OutputStream socketOutputStream = null; + String destinationZipFile = null; + + while ( alive ) + { + try + { + socket = serverSocket.accept (); + LOG.info ( "Standby hive is connected. Compressing metadataDb for sending to the StandBy Server." ); + String dbPath = HaCommonUtils.getDataBasePath (); + destinationZipFile = dbPath + HaConstants.ZIP_EXTENSION; + HiveZipUtils.zipFile ( dbPath, destinationZipFile, true ); + socketOutputStream = socket.getOutputStream (); + file = new File ( destinationZipFile ); + fileInputStream = new FileInputStream ( file ); + IOUtils.copyBytes(fileInputStream, socketOutputStream, BYTE_SIZE, false); + socketOutputStream.flush (); + LOG + .info ( "Compressed and copied metadata db to the StandBy Hive Server." ); + } + catch ( Throwable e ) + { + LOG.warn ( "Exception occured while performing socket Operations", e ); + } + finally + { + IOUtils.closeSocket ( socket ); + IOUtils.closeStream ( fileInputStream ); + IOUtils.closeStream ( socketOutputStream ); + // ZIP FILE SHOULD BE DELETED AFTER FLUSH + try + { + if ( null != file ) + { + boolean success = file.delete (); + if ( success ) + { + if ( LOG.isDebugEnabled () ) + { + LOG + .debug ( "zip file deleted successfully : " + + destinationZipFile ); + } + } + else + { + LOG.warn ( "Unable to delete zip file " + destinationZipFile ); + } + } + } + catch ( Exception e ) + { + LOG.warn ( "Unable to delete zip file " + destinationZipFile, e ); + } + } + } + } + + public void closeSocket () + { + alive = false; + try + { + if ( null != serverSocket ) + { + serverSocket.close (); + LOG.info ( "ServerSocket Closed Successfully" ); + } + } + catch ( IOException exec ) + { + LOG.warn ( exec ); + } + } + +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/task/TaskKeeper.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/task/TaskKeeper.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/task/TaskKeeper.java (revision 0) @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.task; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.RunningJob; + +public class TaskKeeper implements Keeper +{ + private static final Log LOG = LogFactory.getLog ( TaskKeeper.class.getName () ); + + public void record ( String pathToCreate, Configuration conf ) + { + Path path = new Path ( HiveConf.ConfVars.HIVEJOBSPATH.defaultVal + pathToCreate ); + FSDataOutputStream os = null; + FileSystem fs; + try + { + fs = path.getFileSystem ( conf ); + os = fs.create ( path ); + } + catch ( Throwable e ) + { + LOG.warn ( "Exception while creating job record for jobId : " + pathToCreate, e ); + } + finally + { + IOUtils.cleanup ( LOG, os ); + } + } + + public void remove ( String pathToRemove, Configuration conf ) + { + Path path = new Path ( HiveConf.ConfVars.HIVEJOBSPATH.defaultVal + pathToRemove ); + try + { + deleteFile ( conf, path ); + } + catch ( Throwable e ) + { + LOG.warn ( "Exception while removing job record for jobId : " + pathToRemove, e ); + } + } + + private void deleteFile ( Configuration conf, Path path ) throws IOException + { + FileSystem fs = path.getFileSystem ( conf ); + fs.delete ( path, false ); + } + + public void killAllJobs ( Configuration conf, String rootPath ) + { + try + { + FileStatus [] childNodes = getChildNodes ( conf, rootPath ); + killAllRunningJobs ( conf, childNodes ); + removeAllNodes ( conf, rootPath ); + } + catch ( Throwable e ) + { + LOG.warn ( "Exception while killing jobs spawned by previously active process.", e ); + } + } + + private void removeAllNodes ( Configuration conf, String rootPath ) throws IOException + { + Path path = new Path ( rootPath ); + FileSystem fileSystem = path.getFileSystem ( conf ); + fileSystem.delete ( path, true ); + } + + private void killAllRunningJobs ( Configuration conf, FileStatus [] childNodes ) throws IOException + { + String jobId = null; + if ( null != childNodes ) + { + JobClient jobClient = new JobClient ( new JobConf ( conf ) ); + if ( null != jobClient ) + { + try + { + for ( FileStatus node : childNodes ) + { + jobId = node.getPath ().getName (); + kill ( jobClient, jobId ); + } + } + finally + { + jobClient.close (); + } + } + } + } + + private void kill ( JobClient jobClient, String jobId ) + { + RunningJob job; + try + { + job = jobClient.getJob ( JobID.forName ( jobId ) ); + if ( null != job ) + { + job.killJob (); + } + } + catch ( Throwable e ) + { + LOG.warn ( "Exception while killing job : " + jobId, e ); + } + } + + private FileStatus [] getChildNodes ( Configuration conf, String rootPath ) throws IOException + { + Path path = new Path ( rootPath ); + FileSystem fileSystem = path.getFileSystem ( conf ); + return fileSystem.listStatus ( path ); + } +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/util/HaCommonUtils.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/util/HaCommonUtils.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/util/HaCommonUtils.java (revision 0) @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.util; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.UnknownHostException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Properties; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import javax.management.ObjectName; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.hive.metastore.jmx.JmxInvocationHandler; +import org.apache.hadoop.hive.metastore.active.ActiveHiveServerImpl; +import org.apache.zk.leaderelection.manager.ProcessManager; +import org.apache.zk.leaderelection.jmx.JMXService; +import org.apache.zk.leaderelection.jmx.JMXServiceFactory; + +public class HaCommonUtils +{ + private static final String SLAVE_SHUTDOWN_SUCCESS = "XRE42"; + + private static final Log LOG = LogFactory.getLog ( HaCommonUtils.class.getName () ); + + public static final ReadWriteLock lock = new ReentrantReadWriteLock ( true ); + + private static final int SOCKET_TIMEOUT = getIntegerPropertyValue ( + HaConstants.SOCKET_SO_TIMEOUT, HaConstants.DEFAULT_SOCKET_TIMEOUT ); + + private static int EXIT_CODE = -1; + + public static String getLocalIPAddress () throws UnknownHostException + { + return InetAddress.getLocalHost ().getHostAddress (); + } + + /** + * @return + */ + public static String getDataBasePath () + { + return getDerbyLocalPath () + File.separator + HiveConfigReader.getInstance ().getDbName (); + } + + private static String getDerbyLocalPath () + { + String path = System.getProperty ( HaConstants.DERBY_HOME ); + if ( StringUtils.isEmpty ( path ) ) + { + path = System.getProperty ( HaConstants.USER_DIR ); + } + return path; + } + + /** + * @param port + * @return + */ + public static boolean isPortAlreadyInUse ( int port ) + { + try + { + ServerSocket srv = new ServerSocket ( port ); + srv.close (); + srv = null; + return false; + + } + catch ( IOException e ) + { + return true; + } + } + + /** + * @param activeServerIP + * @param portNo + * @return + * @throws UnknownHostException + * @throws IOException + */ + public static Socket getClientSocket ( String activeServerIP, int portNo ) throws IOException + { + Socket socket = new Socket ( activeServerIP, portNo ); + socket.setSoTimeout ( SOCKET_TIMEOUT ); + return socket; + } + + /** + * @param connectionString + * @return + * @throws SQLException + */ + public static Connection getConnection ( String connectionString ) throws SQLException + { + return DriverManager.getConnection ( connectionString ); + } + + /** + * @param propertyName + * @param defaultValue + * @return + */ + public static long getNumericPropertyValue ( String propertyName, long defaultValue ) + { + Properties properties = HiveConfigReader.getInstance ().getHAConf (); + if ( null == properties ) + { + // cover the scenario [Hive is started in Non HA mode ] + return defaultValue; + } + + String intializationTime = properties.getProperty ( propertyName ); + if ( null != intializationTime && 0 != intializationTime.trim ().length () ) + { + try + { + long parseLong = Long.parseLong ( intializationTime ); + if ( parseLong >= 0 ) + { + return parseLong; + } + } + catch ( NumberFormatException e ) + { + return defaultValue; + } + } + return defaultValue; + } + + public static int getIntegerPropertyValue ( String propertyName, int defaultValue ) + { + Properties properties = HiveConfigReader.getInstance ().getHAConf (); + if ( null == properties ) + { + // cover the scenario [Hive is started in Non HA mode ] + return defaultValue; + } + String intializationTime = properties.getProperty ( propertyName ); + if ( null != intializationTime && 0 != intializationTime.trim ().length () ) + { + try + { + int intValue = Integer.parseInt ( intializationTime ); + if ( intValue >= 0 ) + { + return intValue; + } + } + catch ( NumberFormatException e ) + { + return defaultValue; + } + } + return defaultValue; + } + + /** + * + */ + public static void stopSlaveDerby () + { + try + { + StringBuilder builder = new StringBuilder (); + builder.append ( "jdbc:derby:" ); + builder.append ( HiveConfigReader.getInstance ().getDbName () ); + builder.append ( ";stopSlave=true" ); + getConnection ( builder.toString () ); + } + catch ( SQLException e ) + { + if ( SLAVE_SHUTDOWN_SUCCESS.equals ( e.getSQLState () ) ) + { + LOG.info ( "[STANDBY SERVER] Derby metadata replication stopped successfully." ); + } + else + { + LOG.warn ( "[STANDBY SERVER]Failed to stop metadata replication", e ); + } + } + catch ( Throwable e ) + { + LOG.warn ( "[STANDBY SERVER]Failed to stop metadata replication", e ); + } + } + + /** + * @param portNo + * @return + * @throws IOException + */ + public static ServerSocket getServerSocket ( int portNo ) throws IOException + { + return new ServerSocket ( portNo ); + } + + /** + * @param config + * @param data + * @return + * @throws Exception + */ + public static void initializeProcess ( HashMap < String, String > config, + HashMap < String, String > data ) throws Exception + { + new ProcessManager ( config, data ).start (); + } + + public static void closeConnection ( Connection conn ) + { + if ( null != conn ) + { + try + { + conn.close (); + } + catch ( Exception e ) + { + LOG.warn ( "Unable to close the connection", e ); + } + } + } + + public static void registerMBean () throws Exception + { + JMXService service = JMXServiceFactory.getInstance ().getJMXService (); + service.registerMBean ( new JmxInvocationHandler ( new ActiveHiveServerImpl () ), + new ObjectName ( HaConstants.MBEAN_NAME ) ); + } + + public static void unRegisterMBean () + { + JMXService service = JMXServiceFactory.getInstance ().getJMXService (); + try + { + service.unRegisterMBean ( new ObjectName ( HaConstants.MBEAN_NAME ) ); + } + catch ( Exception e ) + { + LOG.warn ( "Unable to unregister MBean ", e ); + } + } + + public static void sleepTimeOut ( long timeOut ) + { + try + { + Thread.sleep ( timeOut ); + } + catch ( InterruptedException e ) + { + // ignore. + } + } + + public static void terminate () + { + System.exit ( EXIT_CODE ); + } +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/util/HaConstants.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/util/HaConstants.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/util/HaConstants.java (revision 0) @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.util; + +public class HaConstants +{ + public static final String ZIP_EXTENSION = ".zip"; + + public static final String JMX_SERVICE_NAME = "hive"; + + public static final String HIVE_EMBEDDED_DRIVER = "org.apache.derby.jdbc.EmbeddedDriver"; + + public static final String OPERATION_RESULT = "result"; + + public static final String DATABASE_NAME = "databaseName"; + + public static final String USER_DIR = "user.dir"; + + public static final String JMX_PORT = "ha.jmx.port"; + + public static final String SOCKET_PORT = "hive.ha.socket.port"; + + public static final String IP_ADDRESS = "hive.ha.ipaddress"; + + public static final String SLAVE_PORT = "hive.ha.slave.port"; + + public static final String HA_HIVE_PROPERTIES = "ha-hive.properties"; + + public static final String HIVE_SERVER_EPHEMERAL_PATH_PREFIX = "/hiveserver_"; + + public static final String COPY_TIME_OUT = "hive.freeze.timeout"; + + public static final long COPY_TIME_OUT_DEFAULT = 120; + + public static final String DB_INITIALIZATION_STATUS = "DB_INITIALIZATION_STATUS"; + + public static final String JMX_CONNECTOR_PORT = "ha.jmx.connector.server.port"; + + public static final String IS_IN_HAMODE = "is.in.hamode"; + + public static final String STOP_REPLICATION = "stopReplication"; + + public static final String MAX_SLAVE_DB_STARTUP_RETRIES = "max.slave.db.startup.retries"; + + public static final String MAX_STOP_REPLICATION_MASTER_TIME = "max.stop.replication.master.time"; + + public static final String MAX_DB_INITALIZATION_TIME = "max.db.initilization.time"; + + public static final String HIVE_DEFAULT_XML = "hive-default.xml"; + + public static final String ACTINE_HIVE_STARTUP_TIME_OUT = "hive.active.server.startup.timeout"; + + public static final String MBEAN_NAME = "HiveHaMBean:name=haMBean"; + + public static final String SOCKET_SO_TIMEOUT = "hive.replication.socket.timeout"; + + public static final int DEFAULT_SOCKET_TIMEOUT = 180000; + + public static final String HIVE_SERVICE_NAME = "/hive"; + + public static final String DERBY_HOME = "derby.system.home"; +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/util/HiveConfigReader.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/util/HiveConfigReader.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/util/HiveConfigReader.java (revision 0) @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.util; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; + + +public class HiveConfigReader +{ + private static final String DUMMY_PORT = "9000"; + + private static final String COLON = ":"; + + private static HiveConfigReader instance = new HiveConfigReader (); + + private Properties hAConf; + + private String dbName; + + private HiveConfigReader () + { + + } + + public static HiveConfigReader getInstance () + { + return instance; + } + + public void validateLocalProperties ( Properties properties ) + { + this.hAConf = properties; + + validatePortNumber ( properties.getProperty ( HaConstants.JMX_CONNECTOR_PORT ), + HaConstants.JMX_CONNECTOR_PORT ); + validatePortNumber ( properties.getProperty ( HaConstants.JMX_PORT ), HaConstants.JMX_PORT ); + validatePortNumber ( properties.getProperty ( HaConstants.SLAVE_PORT ), + HaConstants.SLAVE_PORT ); + validatePortNumber ( properties.getProperty ( HaConstants.SOCKET_PORT ), + HaConstants.SOCKET_PORT ); + validateNonEmptyString ( properties.getProperty ( HaConstants.IP_ADDRESS ), + HaConstants.IP_ADDRESS ); + validateIPAddress ( properties.getProperty ( HaConstants.IP_ADDRESS ) ); + } + + public void validateIPAddress ( String value ) + { + String initialValue = value; + value = value + COLON + DUMMY_PORT; + InetSocketAddress createSocketAddr = null; + try + { + createSocketAddr = NetUtils.createSocketAddr ( value ); + InetAddress address = createSocketAddr.getAddress (); + if ( null == address ) + { + throw new IllegalArgumentException ( HaConstants.IP_ADDRESS + " value is null " ); + } + else + { + if ( false == value.contains ( address.getHostAddress () ) ) + { + throw new IllegalArgumentException ( HaConstants.IP_ADDRESS + + " value is invalid : " + value ); + } + } + } + catch ( Exception e ) + { + throw new IllegalArgumentException ( HaConstants.IP_ADDRESS + " value is invalid : " + + initialValue ); + } + } + + public void validateNonEmptyString ( String value, String argName ) + { + if ( null == value ) + { + throw new IllegalArgumentException ( argName + " value is not entered" ); + } + else if ( 0 == value.trim ().length () ) + { + throw new IllegalArgumentException ( argName + " value should not be empty" ); + } + } + + public void validatePortNumber ( String portNo, String propertyName ) + { + if ( null == portNo ) + { + throw new IllegalArgumentException ( propertyName + " value is not entered" ); + } + else + { + try + { + int parseInt = Integer.parseInt ( portNo ); + if ( parseInt < 1024 || parseInt > 65535 ) + { + throw new IllegalArgumentException ( propertyName + " value is invalid [" + + portNo + + "], please enter a valid port number in given range [1024-65535]" ); + } + } + catch ( NumberFormatException e ) + { + throw new IllegalArgumentException ( propertyName + " value is invalid [" + portNo + + "], please enter a valid port number in the range [1024-65535]" ); + } + } + } + + public Properties getHAConf () + { + return hAConf; + } + + public String getDbName () + { + if ( null == dbName ) + { + Configuration configuration = new Configuration (); + configuration.addResource ( HaConstants.HIVE_DEFAULT_XML ); + dbName = configuration.get ( HaConstants.DATABASE_NAME ); + } + return dbName; + } +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/util/HiveDerbyUtils.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/util/HiveDerbyUtils.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/util/HiveDerbyUtils.java (revision 0) @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.util; + +import java.sql.SQLException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + + +public class HiveDerbyUtils +{ + private static final String SHUTDOWN_DERBY_SUCCESS = "08006"; + + private static final Log LOG = LogFactory.getLog ( HiveDerbyUtils.class.getName () ); + + public static void shutDownDerby () + { + try + { + HaCommonUtils.getConnection ( "jdbc:derby:" + + HiveConfigReader.getInstance ().getDbName () + ";shutdown=true" ); + } + catch ( SQLException e ) + { + // SQL STATE : 08006 - means DB shutdown success in Derby + if ( SHUTDOWN_DERBY_SUCCESS.equals ( e.getSQLState () ) ) + { + LOG.warn ( "Stopped active derby successfully." ); + } + else + { + LOG.warn ( "Unable to stop active derby.", e ); + } + } + } +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/util/HiveZipUtils.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/util/HiveZipUtils.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/util/HiveZipUtils.java (revision 0) @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.util; + +import java.io.Closeable; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.util.Deque; +import java.util.Enumeration; +import java.util.LinkedList; +import java.util.zip.ZipEntry; +import java.util.zip.ZipFile; +import java.util.zip.ZipOutputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.zk.leaderelection.OperationFailedException; + +public class HiveZipUtils +{ + + private static final String FORWARD_SLASH = "/"; + + private static final Log LOG = LogFactory.getLog ( HiveZipUtils.class.getName () ); + + public static void zipFile ( String sourceFolder, String destinationZipFile, boolean overWrite ) + { + File inFolder = new File ( sourceFolder ); + File outFile = new File ( destinationZipFile ); + if ( overWrite == true ) + { + delete ( outFile ); + } + try + { + zip ( inFolder, outFile ); + } + catch ( IOException e ) + { + LOG.info ( "Unable to compress directory " + sourceFolder, e ); + } + } + + public static void unZipFile ( String sourceZipFilePath, String destinationFolderPath, + boolean overWrite, boolean deleteZipFile ) + { + + File sourceZipFile = new File ( sourceZipFilePath ); + File destinationFolder = new File ( destinationFolderPath ); + if ( overWrite == true ) + { + delete ( destinationFolder ); + try + { + unzip ( sourceZipFile, destinationFolder ); + } + catch ( IOException e ) + { + LOG.error ( "unable to Unzip file, fileName:" + sourceZipFilePath, e ); + throw new OperationFailedException ( "unable to Unzip file, fileName:" + + sourceZipFilePath, e ); + } + if ( deleteZipFile ) + { + delete ( sourceZipFile ); + } + } + + } + + private static void delete ( File sourceFolder ) + { + if ( sourceFolder.exists () ) + { + if ( sourceFolder.isDirectory () ) + { + File [] listFiles = sourceFolder.listFiles (); + for ( File file : listFiles ) + { + delete ( file ); + } + } + else + { + sourceFolder.delete (); + } + } + } + + public static void unzip ( File zipfile, File directory ) throws IOException + { + ZipFile zfile = new ZipFile ( zipfile ); + Enumeration < ? extends ZipEntry > entries = zfile.entries (); + File file = null; + while ( entries.hasMoreElements () ) + { + ZipEntry entry = entries.nextElement (); + file = new File ( directory, entry.getName () ); + if ( entry.isDirectory () ) + { + file.mkdirs (); + } + else + { + file.getParentFile ().mkdirs (); + InputStream in = zfile.getInputStream ( entry ); + try + { + copy ( in, file ); + } + finally + { + if ( null != in ) + { + in.close (); + } + } + } + } + } + + private static void copy ( InputStream in, File file ) throws IOException + { + OutputStream out = new FileOutputStream ( file ); + try + { + copy ( in, out ); + } + finally + { + if ( null != out ) + { + out.flush (); + out.close (); + } + } + } + + public static void zip ( File directory, File zipfile ) throws IOException + { + URI base = directory.toURI (); + Deque < File > queue = new LinkedList < File > (); + queue.push ( directory ); + OutputStream out = new FileOutputStream ( zipfile ); + Closeable res = out; + ZipOutputStream zout = null; + try + { + zout = new ZipOutputStream ( out ); + res = zout; + while ( false == queue.isEmpty () ) + { + directory = queue.pop (); + for ( File kid : directory.listFiles () ) + { + String name = base.relativize ( kid.toURI () ).getPath (); + if ( kid.isDirectory () ) + { + queue.push ( kid ); + name = name.endsWith ( FORWARD_SLASH ) ? name : name + FORWARD_SLASH; + zout.putNextEntry ( new ZipEntry ( name ) ); + } + else + { + zout.putNextEntry ( new ZipEntry ( name ) ); + copy ( kid, zout ); + zout.closeEntry (); + } + } + } + } + finally + { + if ( null != res ) + { + zout.flush (); + out.flush (); + res.close (); + out.close (); + } + } + } + + private static void copy ( InputStream in, OutputStream out ) throws IOException + { + byte [] buffer = new byte [1024]; + while ( true ) + { + int readCount = in.read ( buffer ); + if ( readCount < 0 ) + { + break; + } + out.write ( buffer, 0, readCount ); + } + } + + private static void copy ( File file, OutputStream out ) throws IOException + { + InputStream in = new FileInputStream ( file ); + try + { + copy ( in, out ); + } + finally + { + if ( null != in ) + { + in.close (); + } + } + } + +} Index: ql/src/java/conf/hive-exec-log4j.properties =================================================================== --- ql/src/java/conf/hive-exec-log4j.properties (revision 1151733) +++ ql/src/java/conf/hive-exec-log4j.properties (working copy) @@ -33,6 +33,26 @@ log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n +# +# Rolling File Appender - HA Framework +# +log4j.logger.haframework=INFO, HA +log4j.logger.org.I0Itec.zkclient=INFO, HA +log4j.logger.org.apache.zookeeper=INFO, HA + +log4j.appender.HA=org.apache.log4j.RollingFileAppender +log4j.appender.HA.File=${hive.log.dir}/HA_${hive.log.file} + +# Logfile size and and 10 backups +log4j.appender.HA.MaxFileSize=5MB +log4j.appender.HA.MaxBackupIndex=100 + +log4j.appender.HA.layout=org.apache.log4j.PatternLayout +# Pattern format: Date LogLevel LoggerName LogMessage +log4j.appender.HA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n +# Debugging Pattern format +log4j.appender.HA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n + #custom logging levels #log4j.logger.xxx=DEBUG Index: service/src/java/org/apache/hadoop/hive/service/HaHiveServer.java =================================================================== --- service/src/java/org/apache/hadoop/hive/service/HaHiveServer.java (revision 0) +++ service/src/java/org/apache/hadoop/hive/service/HaHiveServer.java (revision 0) @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.service; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.hive.metastore.util.HaCommonUtils; +import org.apache.hadoop.hive.metastore.util.HaConstants; +import org.apache.hadoop.hive.metastore.util.HiveConfigReader; +import org.apache.zk.leaderelection.manager.ProcessManagerConstants; +import org.apache.hadoop.hive.common.PropertyReader; +import org.apache.hadoop.hive.common.LogUtility; + +public class HaHiveServer +{ + private static String [] statrUpArgs = new String [] {}; + + private static final Log LOG = LogFactory.getLog ( HaHiveServer.class ); + + static + { + try + { + Class.forName ( HaConstants.HIVE_EMBEDDED_DRIVER ); + } + catch ( ClassNotFoundException e ) + { + LOG.fatal ( "Unable to load driver : " + HaConstants.HIVE_EMBEDDED_DRIVER, e ); + throw new ExceptionInInitializerError ( e ); + } + } + + @SuppressWarnings("unchecked") + public static void main ( String [] args ) + { + try + { + LogUtility.initHiveLog4j (); + statrUpArgs = args; + // Validating the properties before Starting the HiveServer + Properties properties = PropertyReader.getProperties ( HaConstants.HA_HIVE_PROPERTIES ); + HiveConfigReader.getInstance ().validateLocalProperties ( properties ); + + properties.put ( ProcessManagerConstants.ZK_EPHE_PATH, + HaConstants.HIVE_SERVER_EPHEMERAL_PATH_PREFIX ); + properties + .put ( ProcessManagerConstants.JMX_SERVICE_NAME, HaConstants.JMX_SERVICE_NAME ); + properties.put ( ProcessManagerConstants.SERVICE_NAME, HaConstants.HIVE_SERVICE_NAME ); + + HashMap < String, String > data = getDataProperties ( properties ); + HashMap < String, String > config = new HashMap < String, String > ( ( Map ) properties ); + HaCommonUtils.initializeProcess ( config, data ); + } + catch ( Throwable throwable ) + { + LOG.fatal ( "Error occured while starting the Hive process.", throwable ); + HaCommonUtils.terminate (); + } + } + + private static HashMap < String, String > getDataProperties ( Properties hiveProperties ) + { + HashMap < String, String > dataProperties = new HashMap < String, String > ( 3, 1F ); + + dataProperties.put ( HaConstants.IP_ADDRESS, hiveProperties + .getProperty ( HaConstants.IP_ADDRESS ) ); + dataProperties.put ( HaConstants.SOCKET_PORT, hiveProperties + .getProperty ( HaConstants.SOCKET_PORT ) ); + dataProperties.put ( HaConstants.JMX_PORT, hiveProperties + .getProperty ( HaConstants.JMX_PORT ) ); + return dataProperties; + } + + /** + * @return + */ + public static String [] getStatrUpArgs () + { + return statrUpArgs; + } +} Index: service/src/java/org/apache/hadoop/hive/service/HiveLRM.java =================================================================== --- service/src/java/org/apache/hadoop/hive/service/HiveLRM.java (revision 0) +++ service/src/java/org/apache/hadoop/hive/service/HiveLRM.java (revision 0) @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.service; + +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.metadata.HiveUtils; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.service.HiveServer; + +import org.apache.hadoop.hive.metastore.task.HiveStandByServer; +import org.apache.hadoop.hive.metastore.task.MetadataSyncTask; +import org.apache.hadoop.hive.metastore.util.HaCommonUtils; +import org.apache.hadoop.hive.metastore.util.HaConstants; +import org.apache.hadoop.hive.metastore.util.HiveDerbyUtils; +import org.apache.hadoop.hive.common.HAThreadGroup; +import org.apache.zk.leaderelection.manager.LRM; +import org.apache.zk.leaderelection.OperationFailedException; +import org.apache.hadoop.hive.service.HaHiveServer; +import org.apache.hadoop.hive.metastore.task.HiveRegistry; +import org.apache.hadoop.hive.metastore.task.Keeper; +import org.apache.hadoop.hive.metastore.task.TaskKeeper; + +public class HiveLRM implements LRM +{ + + private static final String HIVE_STARTED_IN_ACTIVE_MODE = "HiveServer started in ACTIVE mode"; + + private static final String STARTING_HIVE_IN_ACTIVE = "Starting HiveServer in ACTIVE mode"; + + private static final String HIVE_STANDBY_FAILED = "Unable to start HiveServer in STANDBY mode"; + + private static final String HIVE_STANDBY_SUCCESS = "HiveServer started STANDBY mode"; + + private static final String HIVE_ACTIVE_FAILED = "Unable to start HiveServer in ACTIVE mode"; + + private static final String HIVE_STANDBY_TO_ACTIVE = "Starting HiveServer in ACTIVE [STANDBY to ACTIVE]"; + + private static final String HIVE_NEUTRAL_FAILED = "Unable to change HiveServer mode to NEUTRAL"; + + private static final String HIVE_NEUTRAL_SUCCESS = "HiveServer changed to NEUTRAL mode"; + + private static final Log LOG = LogFactory.getLog ( HiveLRM.class.getName () ); + + private HiveServer activeServer; + + private HiveStandByServer standByServer; + + private MetadataSyncTask metadataThread; + + private volatile boolean isActive = false; + + + /* + * + */ + public HiveLRM () + { + HiveRegistry.register ( Keeper.TASKKEEPER, new TaskKeeper () ); + // conf.setBoolean(HaConstants.IS_IN_HAMODE, true); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hive.service.HiveLRM#startInActive() + */ + @Override + public void startInActive () + { + startHiveServerInActiveMode (); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hive.service.HiveLRM#startInStandby(java.util.Map) + */ + @Override + public void startInStandby ( Map < String, String > activeServerProps ) + { + startHiveServerInStandByMode ( activeServerProps ); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hive.service.HiveLRM#activeToNeutral() + */ + @Override + public void activeToNeutral () + { + shutDownHiveServer ( HIVE_NEUTRAL_SUCCESS, HIVE_NEUTRAL_FAILED ); + HiveDerbyUtils.shutDownDerby (); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hive.service.HiveLRM#standByToActive() + */ + @Override + public void standByToActive () + { + LOG.info ( HIVE_STANDBY_TO_ACTIVE ); + ( ( Keeper ) HiveRegistry.getObject ( Keeper.TASKKEEPER ) ).killAllJobs ( + new Configuration (), HiveConf.ConfVars.HIVEJOBSPATH.defaultVal ); + stopStandByServer (); + startHiveServerInActiveMode (); + } + + private void stopStandByServer () + { + if ( null != standByServer ) + { + standByServer.destroy (); + } + HaCommonUtils.stopSlaveDerby (); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hive.service.HiveLRM#neutralToActive() + */ + @Override + public void neutralToActive () + { + startHiveServerInActiveMode (); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hive.service.HiveLRM#neutralToStandBy(java.util.Map) + */ + @Override + public void neutralToStandBy ( Map < String, String > activeServerProps ) + { + startHiveServerInStandByMode ( activeServerProps ); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hive.service.HiveLRM#standByToNeutral() + */ + @Override + public void standByToNeutral () + { + LOG.info ( "HiveServer state changed from STANDBY to NEUTRAL" ); + isActive = false; + stopStandByServer (); + HiveDerbyUtils.shutDownDerby (); + } + + protected void startMetadataTask () + { + metadataThread = new MetadataSyncTask ( "MetadataSyncTask" ); + metadataThread.start (); + } + + private void startHiveServerInActiveMode () + { + ( ( Keeper ) HiveRegistry.getObject ( Keeper.TASKKEEPER ) ).killAllJobs ( + new Configuration (), HiveConf.ConfVars.HIVEJOBSPATH.defaultVal ); + try + { + LOG.info ( STARTING_HIVE_IN_ACTIVE ); + startMetadataTask (); + activeServer = new HiveServer (); + spawnAndWait ( HaHiveServer.getStatrUpArgs () ); + + // REGISTER JMX MBEAN + HaCommonUtils.registerMBean (); + + isActive = true; + LOG.info ( HIVE_STARTED_IN_ACTIVE_MODE ); + } + catch ( Throwable e ) + { + isActive = false; + LOG.fatal ( HIVE_ACTIVE_FAILED, e ); + activeServer.destroy (); + throw new OperationFailedException ( HIVE_ACTIVE_FAILED, e ); + } + } + + private void startHiveServerInStandByMode ( Map < String, String > activeServerProps ) + { + try + { + // In either case, from neutral to standby or starting in standby, the isactive is + // set false + isActive = false; + // INITIALIZE STANDBY SERVER + standByServer = new HiveStandByServer (); + standByServer.init ( activeServerProps ); + LOG.info ( HIVE_STANDBY_SUCCESS ); + } + catch ( Throwable e ) + { + LOG.fatal ( HIVE_STANDBY_FAILED, e ); + stopStandByServer (); + throw new OperationFailedException ( HIVE_STANDBY_FAILED, e ); + } + } + + private void shutDownHiveServer ( String msgShutDownSuccess, String msgShutDownFail ) + { + isActive = false; + if ( null != activeServer ) + { + if ( activeServer.isServerRunning () ) + { + activeServer.destroy (); + cleanUpServerSocket (); + LOG.info ( msgShutDownSuccess ); + } + else + { + LOG.info ( msgShutDownFail ); + throw new OperationFailedException ( + "Exception occured while shutting down HiveServer" ); + } + HaCommonUtils.unRegisterMBean (); + } + else + { + LOG.info ( msgShutDownSuccess ); + } + } + + private void cleanUpServerSocket () + { + if ( null != metadataThread ) + { + metadataThread.closeSocket (); + } + } + + @Override + public boolean isPrimary () + { + return isActive; + } + + @Override + public void stop () + { + isActive = false; + } + + @Override + public void initialize ( Object obj ) + { + + } + + /** + * This method is used to start the JT in a separate thread. + * + * @throws Throwable + * On any exception rethrow + */ + protected void spawnAndWait ( String [] args ) throws Throwable + { + // create a threadgroup. + HAThreadGroup group = new HAThreadGroup ( "HiveHAStarter" ); + // Start the thread. + Thread thread = new Thread ( group, new HiveStarter ( activeServer, args ) ); + thread.start (); + + // Wait for hive to get started or re-throw the exception incase any exception is + // thrown on the startup + while ( true ) + { + // check the state if its running. + if ( activeServer.isServerRunning () ) + { + break; + } + else + { + // check if any exception thrown on starting the JT + Throwable exception = group.getException (); + if ( exception != null ) + { + throw exception; + } + } + // release cpu usage... + HaCommonUtils.sleepTimeOut ( 100 ); + } + } +} Index: service/src/java/org/apache/hadoop/hive/service/HiveServer.java =================================================================== --- service/src/java/org/apache/hadoop/hive/service/HiveServer.java (revision 1151733) +++ service/src/java/org/apache/hadoop/hive/service/HiveServer.java (working copy) @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaStore; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.metastore.util.HaCommonUtils; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.plan.api.QueryPlan; @@ -62,6 +63,7 @@ import org.apache.thrift.transport.TTransportFactory; import com.facebook.fb303.fb_status; +import com.sun.corba.se.impl.oa.toa.TOA; /** * Thrift Hive Server Implementation. @@ -85,6 +87,13 @@ private static final int DEFAULT_MAX_WORKER_THREADS = Integer.MAX_VALUE; /** + * To know the server status + */ + private volatile boolean isServerRunning = false; + + private TServer server; + + /** * Handler which implements the Hive Interface This class can be used in lieu * of the HiveClient class to get an embedded server. */ @@ -186,7 +195,12 @@ // case, when calling fetch quueries since execute() has returned. // For now, we disable the test attempts. driver.setTryCount(Integer.MAX_VALUE); - response = driver.run(cmd); + try { + HaCommonUtils.lock.readLock().lock(); + response = driver.run(cmd); + } finally { + HaCommonUtils.lock.readLock().unlock(); + } } else { isHiveQuery = false; driver = null; @@ -643,6 +657,10 @@ } public static void main(String[] args) { + new HiveServer().init(args); + } + + public void init(String[] args) { try { HiveServerCli cli = new HiveServerCli(); @@ -675,7 +693,7 @@ TThreadPoolServer.Options options = new TThreadPoolServer.Options(); options.minWorkerThreads = cli.minWorkerThreads; options.maxWorkerThreads = cli.maxWorkerThreads; - TServer server = new TThreadPoolServer(hfactory, serverTransport, + server = new TThreadPoolServer(hfactory, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), options); @@ -686,10 +704,28 @@ if (cli.isVerbose()) { System.err.println(msg); } - + isServerRunning = true; server.serve(); } catch (Exception x) { + isServerRunning = false; x.printStackTrace(); } } + + public boolean isServerRunning() { + return isServerRunning; + } + + public void setServerRunning(boolean isServerRunning) { + this.isServerRunning = isServerRunning; + } + + public void destroy() { + if (isServerRunning) { + if (null != server) { + server.stop(); + } + isServerRunning = false; + } + } } Index: service/src/java/org/apache/hadoop/hive/service/HiveStarter.java =================================================================== --- service/src/java/org/apache/hadoop/hive/service/HiveStarter.java (revision 0) +++ service/src/java/org/apache/hadoop/hive/service/HiveStarter.java (revision 0) @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.service; + +import org.apache.hadoop.hive.service.HiveServer; + +/** + * This will start the Hive in a separate thread because the serve will be a blocking call. + */ +public class HiveStarter implements Runnable +{ + + private final HiveServer hiveServer; + private final String [] args; + + public HiveStarter ( HiveServer hiveServer, String [] args ) + { + this.args = args; + this.hiveServer = hiveServer; + } + + @Override + public void run () + { + try + { + // start the hive and incase of any exception throw it as runtime exception. + hiveServer.init ( args ); + } + catch ( Exception e ) + { + // rethrow the exception and it will be caught by the hive Thread group. + throw new RuntimeException ( e ); + } + } + +}