Index: bin/ext/hivehaserver.sh
===================================================================
--- bin/ext/hivehaserver.sh (revision 0)
+++ bin/ext/hivehaserver.sh (revision 0)
@@ -0,0 +1,39 @@
+
+THISSERVICE=hivehaserver
+export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} "
+
+hivehaserver() {
+ echo "Starting Hive Thrift Server"
+ CLASS=org.apache.hadoop.hive.service.HaHiveServer
+ if $cygwin; then
+ HIVE_LIB=`cygpath -w "$HIVE_LIB"`
+ fi
+ JAR=${HIVE_LIB}/hive-service-*.jar
+
+ version=$($HADOOP version | awk '{if (NR == 1) {print $2;}}');
+
+ # Save the regex to a var to workaround quoting incompatabilities
+ # between Bash 3.1 and 3.2
+ version_re="^([[:digit:]]+)\.([[:digit:]]+)(\.([[:digit:]]+))?.*$"
+
+ if [[ "$version" =~ $version_re ]]; then
+ major_ver=${BASH_REMATCH[1]}
+ minor_ver=${BASH_REMATCH[2]}
+ patch_ver=${BASH_REMATCH[4]}
+ else
+ echo ""
+ fi
+
+# if [ $minor_ver -lt 20 ]; then
+# exec ${$HADOOP} start jar $AUX_JARS_CMD_LINE $JAR $CLASS $HIVE_OPTS "$@"
+# else
+ # hadoop 20 or newer - skip the aux_jars option and hiveconf
+ exec $HADOOP jar $JAR $CLASS $HIVE_OPTS "$@"
+ # fi
+}
+
+hiveserver_help() {
+ echo "usage HIVE_PORT=xxxx ./hive --service hiveserver"
+ echo " HIVE_PORT : Specify the server port"
+}
+
Index: common/src/java/conf/hive-log4j.properties
===================================================================
--- common/src/java/conf/hive-log4j.properties (revision 1151733)
+++ common/src/java/conf/hive-log4j.properties (working copy)
@@ -44,6 +44,28 @@
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+#
+# Rolling File Appender - HA Framework
+#
+log4j.logger.haframework=INFO, HA
+log4j.logger.org.I0Itec.zkclient=INFO, HA
+log4j.logger.org.apache.zookeeper=INFO, HA
+
+log4j.appender.HA=org.apache.log4j.RollingFileAppender
+log4j.appender.HA.File=${hive.log.dir}/HA_${hive.log.file}
+
+# Logfile size and and 10 backups
+log4j.appender.HA.MaxFileSize=5MB
+log4j.appender.HA.MaxBackupIndex=100
+
+log4j.appender.HA.layout=org.apache.log4j.PatternLayout
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.HA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+# Debugging Pattern format
+log4j.appender.HA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
#custom logging levels
#log4j.logger.xxx=DEBUG
Index: common/src/java/org/apache/hadoop/hive/common/client/ActiveServerConnection.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/common/client/ActiveServerConnection.java (revision 0)
+++ common/src/java/org/apache/hadoop/hive/common/client/ActiveServerConnection.java (revision 0)
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.client;
+
+public class ActiveServerConnection
+{
+ private long version;
+
+ private Object metadata;
+
+ public long getVersion ()
+ {
+ return version;
+ }
+
+ public void setVersion ()
+ {
+ this.version = System.nanoTime ();
+ }
+
+ public Object getMetadata ()
+ {
+ return metadata;
+ }
+
+ public void setMetadata ( Object metadata )
+ {
+ this.metadata = metadata;
+ }
+
+ // subbu:try to remove this now ?
+ public void close ()
+ {
+ // do nothing
+ }
+
+}
Index: common/src/java/org/apache/hadoop/hive/common/client/HAConnector.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/common/client/HAConnector.java (revision 0)
+++ common/src/java/org/apache/hadoop/hive/common/client/HAConnector.java (revision 0)
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.client;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public abstract class HAConnector
+{
+ public static final String ACTIVE_SERVER_FINDER = "ACTIVE_SERVER_FINDER";
+
+ protected ActiveServerConnection connection;
+
+ private Object masterLock = new Object ();
+
+ private ActiveInstanceFinder activeServerFinder = null;
+
+ private CountDownLatch delayLatch = null;
+
+ final ReadWriteLock readWriteLock = new ReentrantReadWriteLock ();
+
+ private static final Log LOG = LogFactory.getLog ( HAConnector.class );
+
+ private static final boolean DEBUG_ENABLED = LOG.isDebugEnabled ();
+
+ public abstract void initServerStatusListener ( List < Object > serverAddresses );
+
+ public abstract void notifyListenerServerAvailable ( ActiveServerConnection connection
+
+);
+
+ public abstract void notifyListenerServerUnavailable ( ActiveServerConnection connection
+
+);
+
+ public ActiveServerConnection getServerMetadata () throws ServerUnavailableException
+ {
+ if ( null == connection )
+ {
+ throw new ServerUnavailableException ( "Server connection is unavailable" );
+ }
+
+ return connection;
+ }
+
+ public void reportServerUnAvailable ( ActiveServerConnection existingConnection,
+
+RetryRule rule )
+ {
+ if ( null != existingConnection && null != connection )
+ {
+ if ( existingConnection.getVersion () < connection.getVersion () )
+ {
+ // Already new version of connection is available
+ return;
+ }
+ }
+
+ synchronized ( masterLock )
+ {
+ if ( null == activeServerFinder )
+ {
+ if ( null != connection )
+ {
+ // Making an null check because during startup existing connection may be null
+ notifyListenerServerUnavailable ( connection );
+ }
+ delayLatch = new CountDownLatch( 1 );
+ activeServerFinder = new ActiveInstanceFinder ( this, delayLatch, readWriteLock );
+ activeServerFinder.setDaemon(true);
+ activeServerFinder.start ();
+
+ try
+ {
+ delayLatch.await ();
+ }
+ catch ( InterruptedException e )
+ {
+ LOG.error ( "Client Thread interrupted", e );
+ }
+ }
+ }
+
+ if ( rule.getModes ().equals ( RetryModes.WAIT_MODE ) )
+ {
+ Lock readLock = readWriteLock.readLock ();
+ boolean isLockAcquired = false;
+ try
+ {
+ if ( DEBUG_ENABLED )
+ {
+ LOG.debug ( "Client thread :" + Thread.currentThread ().getId ()
+ + " starting to wait for the server connection for a period of :"
+ + rule.getAwaitTimeout () + " secs" );
+ }
+ // after waiting for time out period can exit
+ isLockAcquired = readLock.tryLock ( rule.getAwaitTimeout (),
+
+TimeUnit.SECONDS );
+ }
+ catch ( InterruptedException e )
+ {
+ LOG.error ( "Retry Thread interrupted", e );
+ }
+ finally
+ {
+ if ( isLockAcquired )
+ {
+ readLock.unlock ();
+ }
+ }
+ if ( DEBUG_ENABLED )
+ {
+ LOG.debug ( "Client thread :" + Thread.currentThread ().getId ()
+ + " returning after wait period for the server connection" );
+ }
+ return;
+ }
+ else
+ {
+ // if non wait mode can exit immediately
+ return;
+ }
+ }
+
+ public abstract ActiveServerConnection connect () throws Exception;
+
+ public void setConnection ( ActiveServerConnection activeConnection )
+ {
+ synchronized ( masterLock )
+ {
+ // server available plugged in
+ notifyListenerServerAvailable ( activeConnection );
+ this.connection = activeConnection;
+ activeServerFinder = null;
+ }
+ }
+
+ public void setActiveConnection ( ActiveServerConnection activeConnection )
+ {
+ synchronized ( masterLock )
+ {
+ // Current connection can also be null.
+ //if both current and new connections are same then no need to change the connections.
+ if ( null != connection && activeConnection.getMetadata().equals(connection.getMetadata()) )
+ {
+ // increment the time stamp to notify clients that there's new connection
+ // after successful connection obtained.
+ connection.setVersion();
+
+ // server available notify with old connection.
+ notifyListenerServerAvailable ( connection );
+
+ // close the new connection.
+ activeConnection.close();
+ activeServerFinder = null;
+ return;
+ }
+ if(null != connection)
+ {
+ // Making an null check because during startup existing connection may be null
+ // close existing conenction
+ connection.close ();
+ }
+ setConnection(activeConnection);
+ }
+ }
+
+
+ //This method is currently called only by DFSClientHAConnector
+ //and JobClientHAConnector,
+ //It is added as part of defect DC-625
+ protected void destroy()
+ {
+ ActiveInstanceFinder tempActiveInstanceFinder = activeServerFinder;
+ if (null != tempActiveInstanceFinder)
+ {
+ tempActiveInstanceFinder.stopFinder();
+ try {
+ tempActiveInstanceFinder.join();
+ } catch (InterruptedException e) {
+ //no exception must be thrown here.
+ }
+ activeServerFinder = null;
+ }
+ }
+
+}
+
+
+
+class ActiveInstanceFinder extends Thread
+{
+ private final HAConnector haConnector;
+
+ private static final Log LOG = LogFactory.getLog ( ActiveInstanceFinder.class );
+
+ private CountDownLatch delayLatch;
+
+ private ReadWriteLock readWriteLock;
+
+ private boolean stop;
+
+ ActiveInstanceFinder ( HAConnector haConnector, CountDownLatch delayLatch,
+ ReadWriteLock readWriteLock )
+ {
+ super(HAConnector.ACTIVE_SERVER_FINDER);
+ this.haConnector = haConnector;
+ this.delayLatch = delayLatch;
+ this.readWriteLock = readWriteLock;
+ }
+
+ public void stopFinder(){
+ this.stop = true;
+ }
+
+ public void run ()
+ {
+ this.stop = false;
+ LOG.info ( "Retry operation started in back ground:");
+ ActiveServerConnection connection = null;
+ Lock writeLock = readWriteLock.writeLock ();
+ try
+ {
+ writeLock.lock ();
+
+ delayLatch.countDown ();
+
+ while ( !stop )
+ {
+ try
+ {
+ connection = this.haConnector.connect ();
+ if ( null != connection )
+ {
+ this.haConnector.setActiveConnection ( connection );
+ break;
+ }
+ }
+ catch ( Exception e )
+ {
+ if ( LOG.isDebugEnabled () )
+ {
+ LOG.debug ( "Exception occured while retrying:" + e.getMessage () ,e);
+ }
+ }
+ }
+ }
+ finally
+ {
+ writeLock.unlock ();
+ }
+ LOG.info ( "Retry operation completed in back ground." );
+ }
+}
+
+
+
+class NotifiableLockObject
+{
+ private boolean isNotified = false;
+
+ /**
+ * @return
+ */
+ public boolean isNotified ()
+ {
+ return isNotified;
+ }
+
+ /**
+ * @param isNotified
+ */
+ public void setNotified ( boolean isNotified )
+ {
+ this.isNotified = isNotified;
+ }
+}
Index: common/src/java/org/apache/hadoop/hive/common/client/RetryModes.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/common/client/RetryModes.java (revision 0)
+++ common/src/java/org/apache/hadoop/hive/common/client/RetryModes.java (revision 0)
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.client;
+
+public enum RetryModes
+{
+ WAIT_MODE ,
+ NON_WAIT_MODE ;
+}
Index: common/src/java/org/apache/hadoop/hive/common/client/RetryRule.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/common/client/RetryRule.java (revision 0)
+++ common/src/java/org/apache/hadoop/hive/common/client/RetryRule.java (revision 0)
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.client;
+
+public class RetryRule
+{
+ private RetryModes modes;
+
+ private Long awaitTimeout;
+
+ public RetryRule ( RetryModes modesThat, Long awaitTimeoutThat )
+ {
+ this.modes = modesThat;
+ this.awaitTimeout = awaitTimeoutThat;
+ }
+
+ /**
+ * @return RetryModes
+ */
+ public RetryModes getModes ()
+ {
+ return modes;
+ }
+
+ /**
+ * @return awaitTimeOut
+ */
+ public Long getAwaitTimeout ()
+ {
+ return awaitTimeout;
+ }
+}
Index: common/src/java/org/apache/hadoop/hive/common/client/ServerUnavailableException.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/common/client/ServerUnavailableException.java (revision 0)
+++ common/src/java/org/apache/hadoop/hive/common/client/ServerUnavailableException.java (revision 0)
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.client;
+
+public class ServerUnavailableException extends Exception
+{
+ private static final long serialVersionUID = -964356566606864942L;
+
+ public ServerUnavailableException ( String msg )
+ {
+ super ( msg );
+ }
+}
Index: common/src/java/org/apache/hadoop/hive/common/HAThreadGroup.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/common/HAThreadGroup.java (revision 0)
+++ common/src/java/org/apache/hadoop/hive/common/HAThreadGroup.java (revision 0)
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+public class HAThreadGroup extends ThreadGroup
+{
+
+ private Throwable throwable;
+
+ /**
+ * @param name
+ */
+ public HAThreadGroup ( String name )
+ {
+ super ( name );
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.ThreadGroup#uncaughtException(java.lang.Thread, java.lang.Throwable)
+ */
+ @Override
+ public void uncaughtException ( Thread t, Throwable e )
+ {
+ this.throwable = e;
+
+ }
+
+ /**
+ * This method is used to get the received exception from the thread.
+ *
+ * @return {@link Throwable}
+ */
+ public Throwable getException ()
+ {
+ return throwable;
+ }
+}
Index: common/src/java/org/apache/hadoop/hive/common/HiveConfUtils.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/common/HiveConfUtils.java (revision 0)
+++ common/src/java/org/apache/hadoop/hive/common/HiveConfUtils.java (revision 0)
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+/**
+ * For static loading of the HiveConf
+ */
+public class HiveConfUtils {
+
+ public static HiveConf hiveConf = new HiveConf(Configuration.class);
+
+}
Index: common/src/java/org/apache/hadoop/hive/common/LogUtility.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/common/LogUtility.java (revision 0)
+++ common/src/java/org/apache/hadoop/hive/common/LogUtility.java (revision 0)
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+import java.net.URL;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.PropertyConfigurator;
+
+public class LogUtility
+{
+ private static final Log LOG = LogFactory.getLog ( LogUtility.class.getName () );
+
+ private static final String HIVE_L4J = "hive-log4j.properties";
+
+ public static void initHiveLog4j ()
+ {
+ // allow hive log4j to override any normal initialized one
+ URL hive_l4j = LogUtility.class.getClassLoader ().getResource ( HIVE_L4J );
+ if ( hive_l4j == null )
+ {
+ LOG.warn ( "Unable to load " + HIVE_L4J +". The file should be available in classpath. " );
+ }
+ else
+ {
+ LogManager.resetConfiguration ();
+ PropertyConfigurator.configure ( hive_l4j );
+ }
+ }
+
+}
Index: common/src/java/org/apache/hadoop/hive/common/PropertyReader.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/common/PropertyReader.java (revision 0)
+++ common/src/java/org/apache/hadoop/hive/common/PropertyReader.java (revision 0)
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class PropertyReader
+{
+ private static final Log LOG = LogFactory.getLog ( PropertyReader.class );
+
+ public static Properties getProperties ( String propertyFile )
+ {
+ InputStream resourceAsStream = PropertyReader.class.getClassLoader ().getResourceAsStream (
+ propertyFile );
+
+ Properties p = new Properties ();
+
+ if ( null == resourceAsStream )
+ {
+ throw new IllegalArgumentException ( "The property file : " + propertyFile
+ + " provided is not available." );
+ }
+
+ try
+ {
+ p.load ( resourceAsStream );
+ }
+ catch ( IOException e )
+ {
+ LOG.error ( "Exception while reading properties from :" + propertyFile );
+ }
+ finally
+ {
+ try
+ {
+ // null is already handled above.
+ resourceAsStream.close ();
+ }
+ catch ( IOException e )
+ {
+ LOG.error ( "Exception while closing the stream :" ,e);
+ }
+ }
+ return p;
+ }
+}
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1151733)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -457,7 +457,7 @@
HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS("hive.mapper.cannot.span.multiple.partitions", false),
HIVE_REWORK_MAPREDWORK("hive.rework.mapredwork", false),
HIVE_CONCATENATE_CHECK_INDEX ("hive.exec.concatenate.check.index", true),
- ;
+ HIVEJOBSPATH("hive.jobs.path", "/tmp/" + System.getProperty("user.name") + "/hive-jobs/");
public final String varname;
public final String defaultVal;
Index: conf/ha-hive-site.xml
===================================================================
--- conf/ha-hive-site.xml (revision 0)
+++ conf/ha-hive-site.xml (revision 0)
@@ -0,0 +1,50 @@
+
+
+
+
+
+
+
+
+ hive.servers
+
+
+ The active and standby hive server ip address and port.
+ Only when a minimum of two servers are configured HA mode will be
+ considered.
+ example : jdbc:hive://10.18.52.138:10000/default,jdbc:hive://10.18.52.116:10000/default
+
+
+
+
+
+ hive.query.execution.timeout
+ 0
+
+ After specified time it will kill the query execution. This is applicable per query.
+ The value entered here will be considered as milliseconds.
+ The maximum value is Integer.MAX(2147483647) value.
+ Value 0 is considered as infinite.
+
+
+
+
+ hive.connection.maxRetries
+ 3
+
+ The number of maximum retries clients try to connect to one
+ hiveserver after which they try to switch over and connect to the other
+ available hiveserver. Optional Property with default value as 3.
+
+
+
+
+ hive.await.timeout
+ 120
+
+ The connection timeout for the active ha hive connection.
+ The value should be configured in seconds. It is an optional property with default value 120 secs.
+
+
+
+
Index: conf/ha-hive.properties
===================================================================
--- conf/ha-hive.properties (revision 0)
+++ conf/ha-hive.properties (revision 0)
@@ -0,0 +1,73 @@
+########################################################
+#### Configurations for the HA-Common framework ####
+########################################################
+
+# The class name of local resource manager to be implemented for the HA switching
+# This should implement the interface org.apache.hadoop.hive.service.HiveLRM
+# eg: lrm.impl=org.apache.hadoop.hive.service.HiveLRM
+lrm.impl=org.apache.hadoop.hive.service.HiveLRM
+
+#Specifies the Jmx port to initialize the Jmx service,
+#that establishes the communication among HA servers.
+#Example : 4444 - Should give the valid port number
+ha.jmx.port=4444
+
+#Specifies the JMX Connector Server Port. [ This is the server port opened by JMXConnectorServer , if not
+#configured a Random Port will be considered.]
+#Eg : ha.jmx.connector.server.port=9999
+ha.jmx.connector.server.port=9999
+
+########################################################
+#### Configurations for the Zookeeper client ####
+########################################################
+
+# The address in form of the ipaddress and port where the zookeeper is running.
+# This should be a valid ipaddress and port.
+# This address configured should be same for both active and standby hive servers.
+# zookeeper.servers=:,:,:
+# eg: zk.address=10.18.52.25:2181,10.18.52.26:2181,10.18.52.27:2181
+zk.address=
+
+# The value of the zookeeper persisted node .
+# This path configured should be same for both active and standby hive servers.
+# eg: zk.root.path=/hadoop/ha
+zk.root.path=/hadoop/ha
+
+# Zookeeper client session timeout in milliseconds. Default timeout is 20000 milliseconds.
+# Minimum value is 20 seconds. This value is used by the zookeeper cluster to determine when the
+# client's session expires. Expirations happens when the cluster does not hear from the
+# client within the specified session timeout period (i.e. no heartbeat). At session expiration
+# the cluster will delete all ephemeral nodes owned by that session and notifies
+# all connected clients.
+# Should be tuned according to the network ping time of the ZK cluster and number of ZK nodes in the cluster.
+zk.session.timeout=20000
+
+# Zookeeper client connection timeout in milliseconds. Default timeout is 60000 milliseconds.
+# Minimum value is 60 seconds. Specifies the maximum time that the client waits to establish a
+# connection to Zookeeper.
+# Should be tuned according to the network ping time of the ZK cluster and number of ZK nodes in the cluster.
+zk.connection.timeout=60000
+
+
+
+########################################################
+#### Configurations for the hive server####
+########################################################
+#Specifies the port for network socket communication.
+#Network socket communication is required to copy the database
+#from active server to standby server.
+#Example : 6666 - Should give the valid port number
+hive.ha.socket.port=6666
+
+#Specifies the slave port to start the standby server.
+#Example : 8888 - Should give the valid port number
+hive.ha.slave.port=8888
+
+#Specifies the IP address of the machine where the hive server is getting started.
+#Example :10.10.10.10 - Should give the valid ipaddress
+hive.ha.ipaddress=
+
+#Active HiveServer - While synchronizing active metadata with standby,
+#maximum this much time the active metadata DB will not serve any operations.
+#Time in seconds
+hive.freeze.timeout=120
Index: jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveConnection.java
===================================================================
--- jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveConnection.java (revision 1151733)
+++ jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveConnection.java (working copy)
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.jdbc;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.service.HiveClient;
import org.apache.hadoop.hive.service.HiveInterface;
@@ -58,7 +59,14 @@
private SQLWarning warningChain = null;
private static final String URI_PREFIX = "jdbc:hive://";
+ public static final String HA_HIVE_SITE_XML = "ha-hive-site.xml";
+ static {
+ Configuration config = new Configuration();
+ config.addResource(HA_HIVE_SITE_XML);
+ }
+
+
/**
* TODO: - parse uri (use java.net.URI?).
*/
Index: jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveDriver.java
===================================================================
--- jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveDriver.java (revision 1151733)
+++ jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveDriver.java (working copy)
@@ -101,7 +101,7 @@
}
public Connection connect(String url, Properties info) throws SQLException {
- return new HiveConnection(url, info);
+ return new HiveHAConnection().getHiveConnection(url, info);
}
/**
Index: jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveHAConnection.java
===================================================================
--- jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveHAConnection.java (revision 0)
+++ jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveHAConnection.java (revision 0)
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.jdbc;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.hive.common.client.ActiveServerConnection;
+import org.apache.hadoop.hive.common.client.HAConnector;
+import org.apache.hadoop.hive.common.client.ServerUnavailableException;
+import org.apache.hadoop.hive.jdbc.util.HAClientUtil;
+
+public class HiveHAConnection
+{
+ private static final String OPEN_BRACE = "[";
+
+ private static final String CLOSE_BRACE = "]";
+
+ private static final String HIVE_CONNECTION_IN_NON_HA_MODE = "HiveClient is in Non-HA mode, server url : ";
+
+ private static final String HIVE_CONNECTION_IN_HA_MODE = "HiveClient is in HA mode, server urls : ";
+
+ private boolean isConfigValid = false;
+
+ private Configuration config;
+
+ private long maxRetryTime = 0;
+
+ private static final Log LOG = LogFactory.getLog ( HiveHAConnection.class.getName () );
+
+ public Connection getHiveConnection ( String url, Properties info ) throws SQLException
+ {
+ Connection connection = null;
+ if ( isHAMode () )
+ {
+ LOG.info ( HIVE_CONNECTION_IN_HA_MODE + OPEN_BRACE
+ + config.get ( HAClientUtil.HIVE_SERVERS ) + CLOSE_BRACE );
+ connection = getHAConnection ();
+ }
+ else
+ {
+ LOG.info ( HIVE_CONNECTION_IN_NON_HA_MODE + OPEN_BRACE + url + CLOSE_BRACE );
+ connection = HAClientUtil.getConnection ( url, info );
+ }
+ return connection;
+ }
+
+ private boolean isHAMode ()
+ {
+ if ( false == isConfigValid )
+ {
+ config = HAClientUtil.getConfig ();
+ isConfigValid = HAClientUtil.isValidHAConfig ( config );
+ maxRetryTime = HAClientUtil.getRetryValue ( config, HAClientUtil.HIVE_CON_AWAIT_TIME,
+ HAClientUtil.HIVE_CON_AWAIT_TIME_DEFAULT );
+
+ }
+ return isConfigValid;
+ }
+
+ // ------------------ GET HA HIVE CONNECTION ----------------------------
+ private Connection getHAConnection () throws SQLException
+ {
+ String hiveServers = config.get ( HAClientUtil.HIVE_SERVERS );
+ HAConnector connector = HiveHAConnector.getInstance ( config );
+ ActiveServerConnection activeServerMetadata = null;
+ try
+ {
+ activeServerMetadata = connector.getServerMetadata ();
+ if ( null != activeServerMetadata )
+ {
+ // GET ACTIVE SERVER CONNECTION
+ logRetryOperation ();
+ String url = ( String ) activeServerMetadata.getMetadata ();
+ long activeServerStartTime = System.currentTimeMillis ();
+ try
+ {
+ return HAClientUtil.getConnection ( url, null );
+ }
+ catch ( SQLException e )
+ {
+ long activeServerEndTime = System.currentTimeMillis ();
+ long totalTime = ( activeServerEndTime - activeServerStartTime ) / 1000;
+ if ( totalTime > maxRetryTime )
+ {
+ throw new SQLException (
+ "Unable to get HiveConnection in HA-Mode for the following configurations [ha-hive-site.xml - hive.servers] : "
+ + hiveServers, e );
+ }
+ HAClientUtil.startRetryOperation ( connector, activeServerMetadata,
+ ( maxRetryTime - totalTime ), hiveServers );
+ }
+ }
+ }
+ catch ( ServerUnavailableException e )
+ {
+ logRetryOperation ();
+ HAClientUtil.startRetryOperation ( connector, activeServerMetadata, maxRetryTime,
+ hiveServers );
+ }
+ return HAClientUtil.getActualHiveConnection ( connector, hiveServers );
+ }
+
+ private void logRetryOperation ()
+ {
+ LOG.info ( "Trying to get HiveConnection for ACTIVE HiveServer, maximum waiting time : "
+ + maxRetryTime + " seconds" );
+ }
+}
Index: jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveHAConnector.java
===================================================================
--- jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveHAConnector.java (revision 0)
+++ jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveHAConnector.java (revision 0)
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.jdbc;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.hive.common.client.ActiveServerConnection;
+import org.apache.hadoop.hive.common.client.HAConnector;
+import org.apache.hadoop.hive.common.client.RetryRule;
+import org.apache.hadoop.hive.common.client.ServerUnavailableException;
+import org.apache.hadoop.hive.jdbc.util.HAClientUtil;
+
+public class HiveHAConnector extends HAConnector
+{
+ private static final long ONE_SECOND_WAIT = 1000;
+
+ private static final String HIVE_CONNECTION_RETRY_ON = "HiveConnection retry operation failed on URL : ";
+
+ private static final String UNABLE_FIND_THE_ACTIVE_SERVER = "Unable to find the active HiveServer";
+
+ private static volatile HiveHAConnector instance = null;
+
+ private Queue < String > urlQueue;
+
+ private Configuration configuration;
+
+ private long noOfRetries = 0;
+
+ private static final Log LOG = LogFactory.getLog ( HiveHAConnector.class.getName () );
+
+ public static HAConnector getInstance ( Configuration configuration )
+ {
+ if ( null == instance )
+ {
+ synchronized ( HiveHAConnector.class )
+ {
+ if ( null == instance )
+ {
+ instance = new HiveHAConnector ( configuration );
+ }
+ }
+ }
+ return instance;
+ }
+
+ private HiveHAConnector ( Configuration configuration )
+ {
+ this.configuration = configuration;
+ noOfRetries = HAClientUtil.getRetryValue ( configuration, HAClientUtil.HIVE_CON_NO_RETRIES,
+ HAClientUtil.HIVE_CON_NO_RETRIES_DEFAULT );
+ composeHiveUrls ();
+ }
+
+ public ActiveServerConnection connect ()
+ {
+ ActiveServerConnection connection = null;
+
+ // RETRY LOGIC
+ // -------------------------------------------------------------------------
+ String serverUrl = urlQueue.peek ();
+ for ( int retryCount = 0; retryCount < noOfRetries; retryCount++ )
+ {
+ LOG.info ( "HiveConnection is retrying on url : " + serverUrl + ", retryCount : "
+ + retryCount );
+ if ( isHiveServerActive ( serverUrl ) )
+ {
+ connection = new ActiveServerConnection ();
+ connection.setMetadata ( serverUrl );
+ return connection;
+ }
+ HAClientUtil.waitBetweenRetries ( ONE_SECOND_WAIT );
+ }
+ serverUrl = urlQueue.poll ();
+ urlQueue.add ( serverUrl );
+ throw new RuntimeException ( UNABLE_FIND_THE_ACTIVE_SERVER );
+ }
+
+ private boolean isHiveServerActive ( String url )
+ {
+ Connection conn = null;
+ try
+ {
+ conn = HAClientUtil.getConnection ( url, null );
+ return true;
+ }
+ catch ( SQLException e )
+ {
+ if ( LOG.isDebugEnabled () )
+ {
+ LOG.debug ( HIVE_CONNECTION_RETRY_ON + url + ", " + e.getMessage () );
+ }
+ }
+ finally
+ {
+ HAClientUtil.closeConnection ( conn );
+ }
+ return false;
+ }
+
+ private void composeHiveUrls ()
+ {
+ String [] servers = configuration.getStrings ( HAClientUtil.HIVE_SERVERS );
+ int nonEmptyLength = 0;
+ for ( int nonEmptyUrlCount = 0; nonEmptyUrlCount < servers.length; nonEmptyUrlCount++ )
+ {
+ if ( 0 != servers[nonEmptyUrlCount].trim ().length () )
+ {
+ nonEmptyLength++;
+ }
+ }
+ urlQueue = new ArrayBlockingQueue < String > ( nonEmptyLength );
+ for ( int urlCount = 0; urlCount < servers.length; urlCount++ )
+ {
+ if ( 0 != servers[urlCount].trim ().length () )
+ {
+ urlQueue.add ( servers[urlCount] );
+ }
+ }
+ }
+
+ @Override
+ public void reportServerUnAvailable ( ActiveServerConnection existingConnection, RetryRule rule )
+ {
+ super.reportServerUnAvailable ( existingConnection, rule );
+ }
+
+ @Override
+ public ActiveServerConnection getServerMetadata () throws ServerUnavailableException
+ {
+ return super.getServerMetadata ();
+ }
+
+ @Override
+ public void initServerStatusListener ( List < Object > nameNodeAddresses )
+ {
+ // No Implementation
+ }
+
+ @Override
+ public void notifyListenerServerAvailable ( ActiveServerConnection connection )
+ {
+ // No Implementation
+ }
+
+ @Override
+ public void notifyListenerServerUnavailable ( ActiveServerConnection connection )
+ {
+ // No Implementation
+ }
+}
Index: jdbc/src/java/org/apache/hadoop/hive/jdbc/util/HAClientUtil.java
===================================================================
--- jdbc/src/java/org/apache/hadoop/hive/jdbc/util/HAClientUtil.java (revision 0)
+++ jdbc/src/java/org/apache/hadoop/hive/jdbc/util/HAClientUtil.java (revision 0)
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.jdbc.util;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Properties;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.jdbc.HiveConnection;
+
+import org.apache.hadoop.hive.common.client.ActiveServerConnection;
+import org.apache.hadoop.hive.common.client.HAConnector;
+import org.apache.hadoop.hive.common.client.RetryModes;
+import org.apache.hadoop.hive.common.client.RetryRule;
+
+public class HAClientUtil
+{
+ public static final String HIVE_CON_NO_RETRIES = "hive.connection.maxRetries";
+
+ public static final long HIVE_CON_NO_RETRIES_DEFAULT = 3;
+
+ public static final String HIVE_CON_AWAIT_TIME = "hive.await.timeout";
+
+ public static final long HIVE_CON_AWAIT_TIME_DEFAULT = 120;
+
+ public static final String HIVE_SERVERS = "hive.servers";
+
+ public static final String HA_HIVE_SITE_XML = "ha-hive-site.xml";
+
+ private static final String INVALID_SERVER_CONFIG = "[Validation : ha-hive-site.xml] - hive.servers property is invalid";
+
+ private static final Log LOG = LogFactory.getLog ( HAClientUtil.class.getName () );
+
+ public static boolean isValidHAConfig(Configuration configuration) {
+ try {
+ return validateServerConfig(configuration.getStrings(HIVE_SERVERS));
+ } catch (IllegalArgumentException exception) {
+ LOG
+ .warn("The IP and Port number specified for hive servers "
+ + getHiveServers(configuration)
+ + " is invalid "
+ + exception.getMessage());
+ return false;
+ }
+ }
+
+ private static String getHiveServers(Configuration configuration) {
+ return Arrays.toString(configuration.getStrings ( HIVE_SERVERS ));
+ }
+
+ // ---------------- HIVE SERVERS IP AND PORT VALIDATION ----------------------------
+ private static boolean validateServerConfig ( String [] servers )
+ {
+ int validUrlCounter = 0;
+ if ( ArrayUtils.isEmpty ( servers ) || servers.length < 2 )
+ {
+ throw new IllegalArgumentException ( INVALID_SERVER_CONFIG );
+ }
+ else
+ {
+ for ( int urlCount = 0; urlCount < servers.length; urlCount++ )
+ {
+ if ( 0 != servers[urlCount].trim ().length () )
+ {
+ validUrlCounter++;
+ }
+ }
+ if ( validUrlCounter < 2 )
+ {
+ throw new IllegalArgumentException ( INVALID_SERVER_CONFIG );
+ }
+ }
+ return true;
+ }
+
+ public static Connection getConnection ( String url, Properties info ) throws SQLException
+ {
+ return new HiveConnection ( url, info );
+ }
+
+ public static long getRetryValue ( Configuration config, String propertyName, long defaultValue )
+ {
+ Long retryValue = config.getLong ( propertyName, defaultValue );
+ if ( retryValue <= 0 )
+ {
+ StringBuffer msg = new StringBuffer ();
+ msg.append ( "Invalid value is configured for the Property:" );
+ msg.append ( propertyName );
+ msg.append ( " Value:" );
+ msg.append ( retryValue );
+ msg.append ( " .So the default value is considered:" + defaultValue );
+ LOG.warn ( msg.toString () );
+ retryValue = defaultValue;
+ }
+ return retryValue;
+ }
+
+ public static void closeConnection ( Connection conn )
+ {
+ if ( null != conn )
+ {
+ try
+ {
+ conn.close ();
+ }
+ catch ( SQLException execption )
+ {
+ LOG.error ( execption );
+ }
+ }
+ }
+
+ public static void waitBetweenRetries ( long time )
+ {
+ try
+ {
+ Thread.sleep ( time );
+ }
+ catch ( InterruptedException e )
+ {
+ LOG.error ( e );
+ }
+ }
+
+ public static Configuration getConfig ()
+ {
+ Configuration config = new Configuration ();
+ config.addResource ( HA_HIVE_SITE_XML );
+ return config;
+ }
+
+ public static Connection getActualHiveConnection ( HAConnector connector, String hiveServers )
+ throws SQLException
+ {
+ try
+ {
+ ActiveServerConnection newConnection = connector.getServerMetadata ();
+ if ( null != newConnection )
+ {
+ String metadata = ( String ) newConnection.getMetadata ();
+ LOG.info ( "Active HiveServer Connection URL : " + metadata );
+ return HAClientUtil.getConnection ( metadata, null );
+ }
+ else
+ {
+ throw new SQLException ( "HA Connection failed" );
+ }
+
+ }
+ catch ( Exception e )
+ {
+ throw new SQLException (
+ "Unable to get HiveConnection in HA-Mode for the following configurations [ha-hive-site.xml - hive.servers] : "
+ + hiveServers, e );
+ }
+ }
+
+ public static void startRetryOperation ( HAConnector connector,
+ ActiveServerConnection connection, long maxRetryTime, String hiveServer )
+ throws SQLException
+ {
+ RetryRule rule = new RetryRule ( RetryModes.WAIT_MODE, maxRetryTime );
+ try
+ {
+ connector.reportServerUnAvailable ( connection, rule );
+ }
+ catch ( Exception exce )
+ {
+ throw new SQLException (
+ "Unable to get HiveConnection in HA-Mode for the following configurations [ha-hive-site.xml - hive.servers] : "
+ + hiveServer, exce );
+ }
+ }
+}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/active/ActiveHiveServer.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/active/ActiveHiveServer.java (revision 0)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/active/ActiveHiveServer.java (revision 0)
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.active;
+
+public interface ActiveHiveServer
+{
+ /**
+ * @return void
+ */
+ public void startMetadataCopy () throws MetadataException;
+
+ /**
+ * @param slavePort
+ * @return void
+ */
+ public void endMetadataCopy ( String slavePort ) throws MetadataException;
+}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/active/ActiveHiveServerImpl.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/active/ActiveHiveServerImpl.java (revision 0)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/active/ActiveHiveServerImpl.java (revision 0)
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.active;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.hive.metastore.jmx.MetadataCopyTimer;
+import org.apache.hadoop.hive.metastore.util.HiveConfigReader;
+import org.apache.hadoop.hive.metastore.util.HiveDerbyUtils;
+import org.apache.hadoop.hive.metastore.util.HaCommonUtils;
+
+public class ActiveHiveServerImpl implements ActiveHiveServer
+{
+ private static final Log LOG = LogFactory.getLog ( ActiveHiveServerImpl.class.getName () );
+
+ private MetadataCopyTimer copyTimer = new MetadataCopyTimer ();
+
+ @Override
+ public void endMetadataCopy ( String replicationCommand ) throws MetadataException
+ {
+ try
+ {
+ HaCommonUtils.getConnection ( replicationCommand );
+ copyTimer.setMetadataCopy ( false );
+ LOG.info ( "[ACTIVE SERVER]endMetadataCopy successful, Derby replication started" );
+ }
+ catch ( Throwable e )
+ {
+ LOG.error ( "[ACTIVE SERVER]endMetadataCopy failed", e );
+ throw new MetadataException ( e );
+ }
+ }
+
+ @Override
+ public void startMetadataCopy () throws MetadataException
+ {
+ try
+ {
+ // ---------------------------------------------------
+ // TIMEOUT TASK FOR DB COPY
+ copyTimer.setLatch ( new CountDownLatch ( 1 ) );
+ copyTimer.setMetadataCopy ( true );
+ copyTimer.performTimeOutTask ();
+ copyTimer.getLatch ().await ();
+ // ---------------------------------------------------
+
+ stopActiveReplication ();
+ HiveDerbyUtils.shutDownDerby ();
+ LOG.info ( "[ACTIVE SERVER]startMetadataCopy successful" );
+ }
+ catch ( Exception e )
+ {
+ LOG.error ( "[ACTIVE SERVER]startMetadataCopy failed", e );
+ throw new MetadataException ( e );
+ }
+ }
+
+ private void stopActiveReplication ()
+ {
+ try
+ {
+ HaCommonUtils.getConnection ( "jdbc:derby:"
+ + HiveConfigReader.getInstance ().getDbName () + ";stopMaster=true" );
+ LOG.info ( "[ACTIVE SERVER]metadata replication stopped" );
+ }
+ catch ( Throwable e )
+ {
+ LOG.warn ( "[ACTIVE SERVER]Failed to stop metadata replication " + e.getMessage () );
+ }
+ }
+
+}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/active/MetadataException.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/active/MetadataException.java (revision 0)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/active/MetadataException.java (revision 0)
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.active;
+
+public class MetadataException extends RuntimeException
+{
+
+ private static final long serialVersionUID = -1385463276037833900L;
+
+ /**
+ * @param smsgg
+ */
+ public MetadataException ( String msg )
+ {
+ super ( msg );
+ }
+
+ /**
+ * @param e
+ */
+ public MetadataException ( Throwable e )
+ {
+ super ( e );
+ }
+
+ /**
+ * @param e
+ */
+ public MetadataException ( String msg, Throwable e )
+ {
+ super ( msg, e );
+ }
+}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/jmx/HiveJmx.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/jmx/HiveJmx.java (revision 0)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/jmx/HiveJmx.java (revision 0)
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.jmx;
+
+import java.io.IOException;
+import java.lang.reflect.Proxy;
+
+import javax.management.ObjectName;
+import javax.management.remote.JMXServiceURL;
+
+import org.apache.hadoop.hive.metastore.active.ActiveHiveServer;
+
+public class HiveJmx
+{
+ public static ActiveHiveServer getProxy ( JMXServiceURL serviceURL, ObjectName objectName )
+ throws IOException
+ {
+ return ( ActiveHiveServer ) Proxy.newProxyInstance ( ActiveHiveServer.class
+ .getClassLoader (), new Class [] { ActiveHiveServer.class }, new Invoker ( serviceURL,
+ objectName ) );
+ }
+}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/jmx/Invocation.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/jmx/Invocation.java (revision 0)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/jmx/Invocation.java (revision 0)
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.jmx;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Method;
+
+public class Invocation implements Externalizable
+{
+
+ private String methodName;
+
+ private Class < ? > [] parameterClasses;
+
+ private Object [] parameters;
+
+ public Invocation ()
+ {
+ // For Externalizable
+ }
+
+ public Invocation ( Method method, Object [] parameters )
+ {
+ this.methodName = method.getName ();
+ this.parameterClasses = method.getParameterTypes ();
+ this.parameters = parameters;
+ }
+
+ /** The name of the method invoked. */
+ public String getMethodName ()
+ {
+ return methodName;
+ }
+
+ /** The parameter classes. */
+ @SuppressWarnings("unchecked")
+ public Class [] getParameterClasses ()
+ {
+ return parameterClasses;
+ }
+
+ /** The parameter instances. */
+ public Object [] getParameters ()
+ {
+ return parameters;
+ }
+
+ @Override
+ public void readExternal ( ObjectInput in ) throws IOException, ClassNotFoundException
+ {
+ methodName = ( String ) in.readObject ();
+ parameters = new Object [in.readInt ()];
+ parameterClasses = new Class [parameters.length];
+ for ( int i = 0; i < parameters.length; i++ )
+ {
+ parameters[i] = in.readObject ();
+ parameterClasses[i] = parameters[i].getClass ();
+ }
+ }
+
+ @Override
+ public void writeExternal ( ObjectOutput out ) throws IOException
+ {
+ out.writeObject ( methodName );
+ out.writeInt ( parameterClasses.length );
+ for ( int i = 0; i < parameterClasses.length; i++ )
+ {
+ out.writeObject ( parameters[i] );
+ }
+ }
+
+ public byte [] getByteArray() throws Exception
+ {
+ ByteArrayOutputStream buf = new ByteArrayOutputStream(1024);
+ ObjectOutputStream outputStream = new ObjectOutputStream(buf);
+ try
+ {
+ outputStream.writeObject(this);
+ byte[] byteArray = buf.toByteArray();
+ return byteArray;
+ }
+ finally
+ {
+ if (null!= outputStream)
+ {
+ outputStream.close();
+ }
+ }
+ }
+}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/jmx/Invoker.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/jmx/Invoker.java (revision 0)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/jmx/Invoker.java (revision 0)
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.jmx;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.hive.metastore.util.HaCommonUtils;
+import org.apache.hadoop.hive.metastore.util.HaConstants;
+import org.apache.hadoop.hive.metastore.active.MetadataException;
+
+public class Invoker implements InvocationHandler
+{
+ private static final long ONE_SEC = 1000;
+
+ private static final long DEFAULT_WAIT_TIMEOUT = 120;
+
+ private static final Log LOG = LogFactory.getLog ( Invoker.class.getName () );
+
+ private static long startupTimeout = HaCommonUtils.getNumericPropertyValue (
+ HaConstants.ACTINE_HIVE_STARTUP_TIME_OUT, DEFAULT_WAIT_TIMEOUT ) * 1000;
+
+ private final JMXServiceURL serviceUrl;
+
+ private final ObjectName objectName;
+
+ public Invoker ( JMXServiceURL serviceUrl, ObjectName objectName )
+ {
+ this.serviceUrl = serviceUrl;
+ this.objectName = objectName;
+ }
+
+ @Override
+ public Object invoke ( Object proxy, Method method, Object [] args ) throws Throwable
+ {
+ String [] invocationSignature = new String [] { byte[].class.getName () };
+ Invocation invocation = new Invocation ( method, args );
+
+ byte[] byteArray = invocation.getByteArray();
+ Object [] invocationParams = new Object [] { byteArray };
+ JMXConnector connector = null;
+ MBeanServerConnection mbsc = null;
+
+ // Retry logic happens for all 3 operations
+ // 1.Get JMX Connector
+ // 2.Get MBean Connection
+ // 3.Invoke JMX
+
+ long startTime = System.currentTimeMillis ();
+ long failTime = 0L;
+ try
+ {
+ while ( failTime < startupTimeout )
+ {
+ try
+ {
+ if ( null == connector )
+ {
+ connector = JMXConnectorFactory.connect ( serviceUrl );
+ }
+ if ( null == mbsc )
+ {
+ mbsc = connector.getMBeanServerConnection ();
+ }
+ return mbsc.invoke ( objectName, "invoke", invocationParams, invocationSignature );
+ }
+ catch ( Throwable e )
+ {
+ LOG
+ .warn ( "Unable to connect to Active Server. StandBy Server will retry the operation",e );
+ HaCommonUtils.sleepTimeOut ( ONE_SEC );
+ failTime = System.currentTimeMillis () - startTime;
+ }
+ }
+ }
+ finally
+ {
+ if (null!= connector)
+ {
+ LOG.info("Closing the connector.");
+ connector.close();
+ }
+ }
+ throw new MetadataException ( "Unable to connect to Active Server after timeout "
+ + startupTimeout + " millis." );
+ }
+}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/jmx/JmxInvocationHandler.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/jmx/JmxInvocationHandler.java (revision 0)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/jmx/JmxInvocationHandler.java (revision 0)
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.jmx;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.lang.reflect.Method;
+
+public class JmxInvocationHandler implements JmxInvocationHandlerMBean {
+
+ private final Object instance;
+
+ public JmxInvocationHandler(Object instance) {
+ this.instance = instance;
+ }
+
+ @Override
+ public Object invoke(byte [] invocationBytes) throws Exception {
+
+ ByteArrayInputStream buf = new ByteArrayInputStream(invocationBytes);
+ ObjectInputStream inputStream = new ObjectInputStream(buf);
+ Invocation invocation = null;
+ try
+ {
+ invocation = (Invocation)inputStream.readObject();
+ }
+ finally
+ {
+ if(null != inputStream)
+ {
+ inputStream.close();
+ }
+ }
+ Method method = getMethod(invocation);
+ return method.invoke(instance, invocation.getParameters());
+ }
+
+ private Method getMethod(Invocation invocation) throws SecurityException,
+ NoSuchMethodException {
+ return instance.getClass().getMethod(invocation.getMethodName(),
+ invocation.getParameterClasses());
+ }
+
+}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/jmx/JmxInvocationHandlerMBean.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/jmx/JmxInvocationHandlerMBean.java (revision 0)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/jmx/JmxInvocationHandlerMBean.java (revision 0)
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.jmx;
+
+public interface JmxInvocationHandlerMBean
+{
+ public Object invoke ( byte [] invocation ) throws Exception;
+
+}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/jmx/MetadataCopyTimeOutThread.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/jmx/MetadataCopyTimeOutThread.java (revision 0)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/jmx/MetadataCopyTimeOutThread.java (revision 0)
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.jmx;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.hive.metastore.util.HaCommonUtils;
+import org.apache.hadoop.hive.metastore.util.HaConstants;
+
+public class MetadataCopyTimeOutThread extends Thread
+{
+ public static final long COPY_TIME_VALUE = HaCommonUtils.getNumericPropertyValue (
+ HaConstants.COPY_TIME_OUT, HaConstants.COPY_TIME_OUT_DEFAULT ) * 1000;
+
+ private static final int TIMEOUT_INTERVAL = 50;
+
+ private MetadataCopyTimer metadataCopyTimer;
+
+ private static final Log LOG = LogFactory.getLog ( MetadataCopyTimeOutThread.class.getName () );
+
+ public MetadataCopyTimeOutThread ( String threadName, MetadataCopyTimer metadataCopyTimer )
+ {
+ this.setName ( threadName );
+ this.metadataCopyTimer = metadataCopyTimer;
+ }
+
+ @Override
+ public void run ()
+ {
+ try
+ {
+ long awaitTime = 0;
+ HaCommonUtils.lock.writeLock ().lock ();
+ metadataCopyTimer.getLatch ().countDown ();
+ while ( metadataCopyTimer.isMetadataCopied () )
+ {
+
+ if ( awaitTime < COPY_TIME_VALUE )
+ {
+ HaCommonUtils.sleepTimeOut ( TIMEOUT_INTERVAL );
+ awaitTime = awaitTime + TIMEOUT_INTERVAL;
+ }
+ else
+ {
+ if ( metadataCopyTimer.isMetadataCopied () )
+ {
+ metadataCopyTimer.setMetadataCopy ( false );
+ LOG.info ( "Active HiveServer DB is blocked for metadata copy [db block timeout : "
+ + COPY_TIME_VALUE + " milli seconds]" );
+ }
+ }
+ }
+ }
+ finally
+ {
+ HaCommonUtils.lock.writeLock ().unlock ();
+ }
+ }
+}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/jmx/MetadataCopyTimer.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/jmx/MetadataCopyTimer.java (revision 0)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/jmx/MetadataCopyTimer.java (revision 0)
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.jmx;
+
+import java.util.concurrent.CountDownLatch;
+
+public class MetadataCopyTimer
+{
+
+ private volatile boolean isMetadataCopied = false;
+
+ private CountDownLatch latch;
+
+
+ public boolean isMetadataCopied ()
+ {
+ return isMetadataCopied;
+ }
+
+ public void setMetadataCopy ( boolean isMetadataCopied )
+ {
+ this.isMetadataCopied = isMetadataCopied;
+ }
+
+ public void performTimeOutTask ()
+ {
+ new MetadataCopyTimeOutThread ( "MetadataCopyTimeOutThread", this ).start ();
+ }
+
+ public CountDownLatch getLatch ()
+ {
+ return latch;
+ }
+
+ public void setLatch ( CountDownLatch latch )
+ {
+ this.latch = latch;
+ }
+}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/task/HaStandByServerThread.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/task/HaStandByServerThread.java (revision 0)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/task/HaStandByServerThread.java (revision 0)
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.task;
+
+import java.net.UnknownHostException;
+import java.sql.Connection;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.hive.metastore.util.HaCommonUtils;
+import org.apache.hadoop.hive.metastore.util.HaConstants;
+import org.apache.hadoop.hive.metastore.util.HiveConfigReader;
+
+public class HaStandByServerThread extends Thread
+{
+
+ private static final Log LOG = LogFactory.getLog ( HaStandByServerThread.class.getName () );
+
+ @Override
+ public void run ()
+ {
+ Connection connection = null;
+ try
+ {
+ String standByCommand = constructStandByCmd ();
+ LOG.info ( "Starting StandBy Server with URL: " + standByCommand );
+ connection = HaCommonUtils.getConnection ( standByCommand );
+ }
+ catch ( Exception e )
+ {
+ if ( e.getMessage ().contains ( "Replication slave mode started successfully" ) )
+ {
+ LOG.info ( "[STANDBY server] Replication slave mode started successfully" );
+ LOG.info ( "HiveServer started in STANDBY mode" );
+ }
+ else
+ {
+ LOG.error ( "Unable to start standBy server ", e );
+ }
+ }
+ finally
+ {
+ HaCommonUtils.closeConnection ( connection );
+ }
+ }
+
+ private String constructStandByCmd () throws UnknownHostException
+ {
+ Properties conf = HiveConfigReader.getInstance ().getHAConf ();
+ StringBuilder builder = new StringBuilder ();
+ builder.append ( "jdbc:derby:" );
+ builder.append ( HiveConfigReader.getInstance ().getDbName () );
+ builder.append ( ";startSlave=true;slaveHost=" );
+ builder.append ( HaCommonUtils.getLocalIPAddress () );
+ builder.append ( ";slavePort=" );
+ builder.append ( conf.getProperty ( HaConstants.SLAVE_PORT ) );
+ return builder.toString ();
+ }
+}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/task/HiveRegistry.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/task/HiveRegistry.java (revision 0)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/task/HiveRegistry.java (revision 0)
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.task;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class HiveRegistry
+{
+
+ private final static Map < String, Object > registry = new ConcurrentHashMap < String, Object > (
+ 8 );
+
+ public static Object getObject ( String key )
+ {
+ return registry.get ( key );
+ }
+
+ public static void register ( String key, Object object )
+ {
+ registry.put ( key, object );
+ }
+}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/task/HiveStandByServer.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/task/HiveStandByServer.java (revision 0)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/task/HiveStandByServer.java (revision 0)
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.task;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.util.Map;
+
+import javax.management.ObjectName;
+import javax.management.remote.JMXServiceURL;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
+
+import org.apache.hadoop.hive.metastore.jmx.HiveJmx;
+import org.apache.hadoop.hive.metastore.util.HaCommonUtils;
+import org.apache.hadoop.hive.metastore.util.HaConstants;
+import org.apache.hadoop.hive.metastore.util.HiveConfigReader;
+import org.apache.hadoop.hive.metastore.util.HiveZipUtils;
+import org.apache.hadoop.hive.metastore.active.ActiveHiveServer;
+import org.apache.hadoop.hive.metastore.active.MetadataException;
+
+public class HiveStandByServer
+{
+ private static final int BYTE_SIZE = 1024;
+
+ private static final int ONE_SECOND = 1000;
+
+ private final int maxWaitTime;
+
+ private static final int DEFAULT_MAX_RETRIES = 5;
+
+ private static final int MAX_WAIT_TIME = 120000;
+
+ private static final Log LOG = LogFactory.getLog ( HiveStandByServer.class.getName () );
+
+ private HaStandByServerThread haStandByServerThread;
+
+ private HiveConfigReader confReader = HiveConfigReader.getInstance ();
+
+ private boolean isServerRunning = false;
+
+ public HiveStandByServer ()
+ {
+ this ( MAX_WAIT_TIME );
+ }
+
+ HiveStandByServer ( int maxWaitTime )
+ {
+ this.maxWaitTime = maxWaitTime;
+ }
+
+ public void init ( Map < String, String > activeServerProps ) throws Exception
+ {
+ String ipAddress = activeServerProps.get ( HaConstants.IP_ADDRESS );
+ String jmxPort = activeServerProps.get ( HaConstants.JMX_PORT );
+
+ JMXServiceURL serviceURL = new JMXServiceURL ( "service:jmx:rmi:///jndi/rmi://" + ipAddress
+ + ':' + jmxPort + '/' + HaConstants.JMX_SERVICE_NAME );
+ ActiveHiveServer activeHiveServer = HiveJmx.getProxy ( serviceURL, new ObjectName (
+ HaConstants.MBEAN_NAME ) );
+
+ activeHiveServer.startMetadataCopy ();
+
+ // COPY THE METADATA THROUGH SOCKET
+ readAndUnZipMetadata ( activeServerProps );
+
+ // START STANDBY IN SLAVE MODE
+ startStandByHive ();
+
+ // WAIT FOR STANDBY STARTUP IN SLAVE MODE
+ waitForStanByDBStartUp ();
+
+ // STOP COPY
+ activeHiveServer.endMetadataCopy ( constructReplicationCmd () );
+
+ isServerRunning = true;
+ LOG.info ( "[STANDBY SERVER] Started StandByServer successfully in replication mode" );
+ }
+
+ private String constructReplicationCmd ()
+ {
+ StringBuilder builder = new StringBuilder ();
+ builder.append ( "jdbc:derby:" );
+ builder.append ( confReader.getDbName () );
+ builder.append ( ";startMaster=true;slaveHost=" );
+ builder.append ( confReader.getHAConf ().getProperty ( HaConstants.IP_ADDRESS ) );
+ builder.append ( ";slavePort=" );
+ builder.append ( confReader.getHAConf ().getProperty ( HaConstants.SLAVE_PORT ) );
+ return builder.toString ();
+ }
+
+ private void readAndUnZipMetadata ( Map < String, String > activeServerProps )
+ {
+ Socket clientSocket = null;
+ FileOutputStream fileOutputStream = null;
+ InputStream socketInputStream = null;
+ String zipFilePath = null;
+ try
+ {
+ zipFilePath = System.getProperty ( HaConstants.USER_DIR ) + File.separator
+ + confReader.getDbName () + HaConstants.ZIP_EXTENSION;
+ String activeServerIP = activeServerProps.get ( HaConstants.IP_ADDRESS );
+ long startTime = System.currentTimeMillis ();
+ while ( null == clientSocket )
+ {
+ int portNo = Integer.parseInt ( activeServerProps.get ( HaConstants.SOCKET_PORT ) );
+ try
+ {
+ clientSocket = HaCommonUtils.getClientSocket ( activeServerIP, portNo );
+ }
+ catch ( IOException e )
+ {
+ long failTime = System.currentTimeMillis () - startTime;
+ if ( failTime > maxWaitTime )
+ {
+ LOG.error ( "Connection cannot be obtained to Active Server at "
+ + activeServerIP + ':' + portNo + " even after " + maxWaitTime
+ + " milliseconds.", e );
+ throw new MetadataException ( e );
+ }
+ else
+ {
+ LOG
+ .warn ( "Retry the server socket connection portno for reading the active server database : "
+ + portNo );
+ }
+ }
+ Thread.sleep ( ONE_SECOND );
+ }
+
+ socketInputStream = clientSocket.getInputStream ();
+ fileOutputStream = new FileOutputStream ( zipFilePath );
+ IOUtils.copyBytes(socketInputStream, fileOutputStream, BYTE_SIZE, false);
+ fileOutputStream.flush ();
+ HiveZipUtils.unZipFile ( zipFilePath, HaCommonUtils.getDataBasePath (), true, true );
+ LOG.info ( "Unzipping metadata from Active Server to :" + HaCommonUtils.getDataBasePath () );
+ }
+ catch ( Exception e )
+ {
+ LOG.error ( "Unable to read metadata zipfile " + zipFilePath, e );
+ throw new MetadataException ( "exception occured while reading metadata as a zipfile",
+ e );
+ }
+ finally
+ {
+ IOUtils.closeSocket ( clientSocket );
+ IOUtils.closeStream ( fileOutputStream );
+ IOUtils.closeStream ( socketInputStream );
+ }
+ }
+
+ private void startStandByHive ()
+ {
+ haStandByServerThread = new HaStandByServerThread ();
+ haStandByServerThread.start ();
+ }
+
+ private void waitForStanByDBStartUp () throws InterruptedException
+ {
+ String slavePort = confReader.getHAConf ().getProperty ( HaConstants.SLAVE_PORT );
+ long maxRetries = HaCommonUtils.getNumericPropertyValue (
+ HaConstants.MAX_SLAVE_DB_STARTUP_RETRIES, DEFAULT_MAX_RETRIES );
+ long numRetries = maxRetries;
+ while ( false == HaCommonUtils.isPortAlreadyInUse ( Integer.parseInt ( slavePort ) ) )
+ {
+ if ( LOG.isDebugEnabled () )
+ {
+ LOG
+ .debug ( "Replication port is not in use for metadata replication, waiting for the port to be opened by standBy server" );
+ }
+ Thread.sleep ( ONE_SECOND );
+ if ( numRetries-- == 0 )
+ {
+ throw new MetadataException ( "Slave metadata DB is not started even after "
+ + ( ONE_SECOND * maxRetries ) + " milliseconds " );
+ }
+ }
+ }
+
+ public void destroy ()
+ {
+ if ( null != haStandByServerThread && haStandByServerThread.isAlive () )
+ {
+ haStandByServerThread.interrupt ();
+ }
+ }
+
+ public boolean isServerRunning ()
+ {
+ return isServerRunning;
+ }
+}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/task/Keeper.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/task/Keeper.java (revision 0)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/task/Keeper.java (revision 0)
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.task;
+
+import org.apache.hadoop.conf.Configuration;
+
+public interface Keeper
+{
+
+ public static final String TASKKEEPER = "TASKKEEPER";
+
+ void record ( String path, Configuration conf );
+
+ void remove ( String path, Configuration conf );
+
+ void killAllJobs ( Configuration conf, String jobID );
+
+}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/task/MetadataSyncTask.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/task/MetadataSyncTask.java (revision 0)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/task/MetadataSyncTask.java (revision 0)
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.task;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
+
+import org.apache.hadoop.hive.metastore.util.HaCommonUtils;
+import org.apache.hadoop.hive.metastore.util.HaConstants;
+import org.apache.hadoop.hive.metastore.util.HiveConfigReader;
+import org.apache.hadoop.hive.metastore.util.HiveZipUtils;
+
+public class MetadataSyncTask extends Thread
+{
+
+ private static final int BYTE_SIZE = 1024;
+
+ private ServerSocket serverSocket;
+
+ private static final Log LOG = LogFactory.getLog ( MetadataSyncTask.class.getName () );
+
+ private volatile boolean alive = true;
+
+ public MetadataSyncTask ( String threadName )
+ {
+ this.setName ( threadName );
+ }
+
+ @Override
+ public void run ()
+ {
+ compressAndWriteZipFileToSocket ();
+ }
+
+ private void compressAndWriteZipFileToSocket ()
+ {
+ Properties properties = HiveConfigReader.getInstance ().getHAConf ();
+ // --------------------------------------------------------------------
+ LOG.info ( "Active Hive server is listening for the standby hive connection " );
+ int portNo = Integer.parseInt ( ( String ) properties
+ .getProperty ( HaConstants.SOCKET_PORT ) );
+ try
+ {
+ serverSocket = HaCommonUtils.getServerSocket ( portNo );
+ }
+ catch ( IOException e )
+ {
+ LOG.error ( "Error listening to socket " + portNo
+ + " Replication of metastore_db to Standby Server will not work", e );
+ return;
+ }
+
+ // --------------------------------------------------------------------
+ Socket socket = null;
+ File file = null;
+ FileInputStream fileInputStream = null;
+ OutputStream socketOutputStream = null;
+ String destinationZipFile = null;
+
+ while ( alive )
+ {
+ try
+ {
+ socket = serverSocket.accept ();
+ LOG.info ( "Standby hive is connected. Compressing metadataDb for sending to the StandBy Server." );
+ String dbPath = HaCommonUtils.getDataBasePath ();
+ destinationZipFile = dbPath + HaConstants.ZIP_EXTENSION;
+ HiveZipUtils.zipFile ( dbPath, destinationZipFile, true );
+ socketOutputStream = socket.getOutputStream ();
+ file = new File ( destinationZipFile );
+ fileInputStream = new FileInputStream ( file );
+ IOUtils.copyBytes(fileInputStream, socketOutputStream, BYTE_SIZE, false);
+ socketOutputStream.flush ();
+ LOG
+ .info ( "Compressed and copied metadata db to the StandBy Hive Server." );
+ }
+ catch ( Throwable e )
+ {
+ LOG.warn ( "Exception occured while performing socket Operations", e );
+ }
+ finally
+ {
+ IOUtils.closeSocket ( socket );
+ IOUtils.closeStream ( fileInputStream );
+ IOUtils.closeStream ( socketOutputStream );
+ // ZIP FILE SHOULD BE DELETED AFTER FLUSH
+ try
+ {
+ if ( null != file )
+ {
+ boolean success = file.delete ();
+ if ( success )
+ {
+ if ( LOG.isDebugEnabled () )
+ {
+ LOG
+ .debug ( "zip file deleted successfully : "
+ + destinationZipFile );
+ }
+ }
+ else
+ {
+ LOG.warn ( "Unable to delete zip file " + destinationZipFile );
+ }
+ }
+ }
+ catch ( Exception e )
+ {
+ LOG.warn ( "Unable to delete zip file " + destinationZipFile, e );
+ }
+ }
+ }
+ }
+
+ public void closeSocket ()
+ {
+ alive = false;
+ try
+ {
+ if ( null != serverSocket )
+ {
+ serverSocket.close ();
+ LOG.info ( "ServerSocket Closed Successfully" );
+ }
+ }
+ catch ( IOException exec )
+ {
+ LOG.warn ( exec );
+ }
+ }
+
+}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/task/TaskKeeper.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/task/TaskKeeper.java (revision 0)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/task/TaskKeeper.java (revision 0)
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.task;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RunningJob;
+
+public class TaskKeeper implements Keeper
+{
+ private static final Log LOG = LogFactory.getLog ( TaskKeeper.class.getName () );
+
+ public void record ( String pathToCreate, Configuration conf )
+ {
+ Path path = new Path ( HiveConf.ConfVars.HIVEJOBSPATH.defaultVal + pathToCreate );
+ FSDataOutputStream os = null;
+ FileSystem fs;
+ try
+ {
+ fs = path.getFileSystem ( conf );
+ os = fs.create ( path );
+ }
+ catch ( Throwable e )
+ {
+ LOG.warn ( "Exception while creating job record for jobId : " + pathToCreate, e );
+ }
+ finally
+ {
+ IOUtils.cleanup ( LOG, os );
+ }
+ }
+
+ public void remove ( String pathToRemove, Configuration conf )
+ {
+ Path path = new Path ( HiveConf.ConfVars.HIVEJOBSPATH.defaultVal + pathToRemove );
+ try
+ {
+ deleteFile ( conf, path );
+ }
+ catch ( Throwable e )
+ {
+ LOG.warn ( "Exception while removing job record for jobId : " + pathToRemove, e );
+ }
+ }
+
+ private void deleteFile ( Configuration conf, Path path ) throws IOException
+ {
+ FileSystem fs = path.getFileSystem ( conf );
+ fs.delete ( path, false );
+ }
+
+ public void killAllJobs ( Configuration conf, String rootPath )
+ {
+ try
+ {
+ FileStatus [] childNodes = getChildNodes ( conf, rootPath );
+ killAllRunningJobs ( conf, childNodes );
+ removeAllNodes ( conf, rootPath );
+ }
+ catch ( Throwable e )
+ {
+ LOG.warn ( "Exception while killing jobs spawned by previously active process.", e );
+ }
+ }
+
+ private void removeAllNodes ( Configuration conf, String rootPath ) throws IOException
+ {
+ Path path = new Path ( rootPath );
+ FileSystem fileSystem = path.getFileSystem ( conf );
+ fileSystem.delete ( path, true );
+ }
+
+ private void killAllRunningJobs ( Configuration conf, FileStatus [] childNodes ) throws IOException
+ {
+ String jobId = null;
+ if ( null != childNodes )
+ {
+ JobClient jobClient = new JobClient ( new JobConf ( conf ) );
+ if ( null != jobClient )
+ {
+ try
+ {
+ for ( FileStatus node : childNodes )
+ {
+ jobId = node.getPath ().getName ();
+ kill ( jobClient, jobId );
+ }
+ }
+ finally
+ {
+ jobClient.close ();
+ }
+ }
+ }
+ }
+
+ private void kill ( JobClient jobClient, String jobId )
+ {
+ RunningJob job;
+ try
+ {
+ job = jobClient.getJob ( JobID.forName ( jobId ) );
+ if ( null != job )
+ {
+ job.killJob ();
+ }
+ }
+ catch ( Throwable e )
+ {
+ LOG.warn ( "Exception while killing job : " + jobId, e );
+ }
+ }
+
+ private FileStatus [] getChildNodes ( Configuration conf, String rootPath ) throws IOException
+ {
+ Path path = new Path ( rootPath );
+ FileSystem fileSystem = path.getFileSystem ( conf );
+ return fileSystem.listStatus ( path );
+ }
+}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/util/HaCommonUtils.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/util/HaCommonUtils.java (revision 0)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/util/HaCommonUtils.java (revision 0)
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Properties;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.management.ObjectName;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.hive.metastore.jmx.JmxInvocationHandler;
+import org.apache.hadoop.hive.metastore.active.ActiveHiveServerImpl;
+import org.apache.zk.leaderelection.manager.ProcessManager;
+import org.apache.zk.leaderelection.jmx.JMXService;
+import org.apache.zk.leaderelection.jmx.JMXServiceFactory;
+
+public class HaCommonUtils
+{
+ private static final String SLAVE_SHUTDOWN_SUCCESS = "XRE42";
+
+ private static final Log LOG = LogFactory.getLog ( HaCommonUtils.class.getName () );
+
+ public static final ReadWriteLock lock = new ReentrantReadWriteLock ( true );
+
+ private static final int SOCKET_TIMEOUT = getIntegerPropertyValue (
+ HaConstants.SOCKET_SO_TIMEOUT, HaConstants.DEFAULT_SOCKET_TIMEOUT );
+
+ private static int EXIT_CODE = -1;
+
+ public static String getLocalIPAddress () throws UnknownHostException
+ {
+ return InetAddress.getLocalHost ().getHostAddress ();
+ }
+
+ /**
+ * @return
+ */
+ public static String getDataBasePath ()
+ {
+ return getDerbyLocalPath () + File.separator + HiveConfigReader.getInstance ().getDbName ();
+ }
+
+ private static String getDerbyLocalPath ()
+ {
+ String path = System.getProperty ( HaConstants.DERBY_HOME );
+ if ( StringUtils.isEmpty ( path ) )
+ {
+ path = System.getProperty ( HaConstants.USER_DIR );
+ }
+ return path;
+ }
+
+ /**
+ * @param port
+ * @return
+ */
+ public static boolean isPortAlreadyInUse ( int port )
+ {
+ try
+ {
+ ServerSocket srv = new ServerSocket ( port );
+ srv.close ();
+ srv = null;
+ return false;
+
+ }
+ catch ( IOException e )
+ {
+ return true;
+ }
+ }
+
+ /**
+ * @param activeServerIP
+ * @param portNo
+ * @return
+ * @throws UnknownHostException
+ * @throws IOException
+ */
+ public static Socket getClientSocket ( String activeServerIP, int portNo ) throws IOException
+ {
+ Socket socket = new Socket ( activeServerIP, portNo );
+ socket.setSoTimeout ( SOCKET_TIMEOUT );
+ return socket;
+ }
+
+ /**
+ * @param connectionString
+ * @return
+ * @throws SQLException
+ */
+ public static Connection getConnection ( String connectionString ) throws SQLException
+ {
+ return DriverManager.getConnection ( connectionString );
+ }
+
+ /**
+ * @param propertyName
+ * @param defaultValue
+ * @return
+ */
+ public static long getNumericPropertyValue ( String propertyName, long defaultValue )
+ {
+ Properties properties = HiveConfigReader.getInstance ().getHAConf ();
+ if ( null == properties )
+ {
+ // cover the scenario [Hive is started in Non HA mode ]
+ return defaultValue;
+ }
+
+ String intializationTime = properties.getProperty ( propertyName );
+ if ( null != intializationTime && 0 != intializationTime.trim ().length () )
+ {
+ try
+ {
+ long parseLong = Long.parseLong ( intializationTime );
+ if ( parseLong >= 0 )
+ {
+ return parseLong;
+ }
+ }
+ catch ( NumberFormatException e )
+ {
+ return defaultValue;
+ }
+ }
+ return defaultValue;
+ }
+
+ public static int getIntegerPropertyValue ( String propertyName, int defaultValue )
+ {
+ Properties properties = HiveConfigReader.getInstance ().getHAConf ();
+ if ( null == properties )
+ {
+ // cover the scenario [Hive is started in Non HA mode ]
+ return defaultValue;
+ }
+ String intializationTime = properties.getProperty ( propertyName );
+ if ( null != intializationTime && 0 != intializationTime.trim ().length () )
+ {
+ try
+ {
+ int intValue = Integer.parseInt ( intializationTime );
+ if ( intValue >= 0 )
+ {
+ return intValue;
+ }
+ }
+ catch ( NumberFormatException e )
+ {
+ return defaultValue;
+ }
+ }
+ return defaultValue;
+ }
+
+ /**
+ *
+ */
+ public static void stopSlaveDerby ()
+ {
+ try
+ {
+ StringBuilder builder = new StringBuilder ();
+ builder.append ( "jdbc:derby:" );
+ builder.append ( HiveConfigReader.getInstance ().getDbName () );
+ builder.append ( ";stopSlave=true" );
+ getConnection ( builder.toString () );
+ }
+ catch ( SQLException e )
+ {
+ if ( SLAVE_SHUTDOWN_SUCCESS.equals ( e.getSQLState () ) )
+ {
+ LOG.info ( "[STANDBY SERVER] Derby metadata replication stopped successfully." );
+ }
+ else
+ {
+ LOG.warn ( "[STANDBY SERVER]Failed to stop metadata replication", e );
+ }
+ }
+ catch ( Throwable e )
+ {
+ LOG.warn ( "[STANDBY SERVER]Failed to stop metadata replication", e );
+ }
+ }
+
+ /**
+ * @param portNo
+ * @return
+ * @throws IOException
+ */
+ public static ServerSocket getServerSocket ( int portNo ) throws IOException
+ {
+ return new ServerSocket ( portNo );
+ }
+
+ /**
+ * @param config
+ * @param data
+ * @return
+ * @throws Exception
+ */
+ public static void initializeProcess ( HashMap < String, String > config,
+ HashMap < String, String > data ) throws Exception
+ {
+ new ProcessManager ( config, data ).start ();
+ }
+
+ public static void closeConnection ( Connection conn )
+ {
+ if ( null != conn )
+ {
+ try
+ {
+ conn.close ();
+ }
+ catch ( Exception e )
+ {
+ LOG.warn ( "Unable to close the connection", e );
+ }
+ }
+ }
+
+ public static void registerMBean () throws Exception
+ {
+ JMXService service = JMXServiceFactory.getInstance ().getJMXService ();
+ service.registerMBean ( new JmxInvocationHandler ( new ActiveHiveServerImpl () ),
+ new ObjectName ( HaConstants.MBEAN_NAME ) );
+ }
+
+ public static void unRegisterMBean ()
+ {
+ JMXService service = JMXServiceFactory.getInstance ().getJMXService ();
+ try
+ {
+ service.unRegisterMBean ( new ObjectName ( HaConstants.MBEAN_NAME ) );
+ }
+ catch ( Exception e )
+ {
+ LOG.warn ( "Unable to unregister MBean ", e );
+ }
+ }
+
+ public static void sleepTimeOut ( long timeOut )
+ {
+ try
+ {
+ Thread.sleep ( timeOut );
+ }
+ catch ( InterruptedException e )
+ {
+ // ignore.
+ }
+ }
+
+ public static void terminate ()
+ {
+ System.exit ( EXIT_CODE );
+ }
+}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/util/HaConstants.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/util/HaConstants.java (revision 0)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/util/HaConstants.java (revision 0)
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.util;
+
+public class HaConstants
+{
+ public static final String ZIP_EXTENSION = ".zip";
+
+ public static final String JMX_SERVICE_NAME = "hive";
+
+ public static final String HIVE_EMBEDDED_DRIVER = "org.apache.derby.jdbc.EmbeddedDriver";
+
+ public static final String OPERATION_RESULT = "result";
+
+ public static final String DATABASE_NAME = "databaseName";
+
+ public static final String USER_DIR = "user.dir";
+
+ public static final String JMX_PORT = "ha.jmx.port";
+
+ public static final String SOCKET_PORT = "hive.ha.socket.port";
+
+ public static final String IP_ADDRESS = "hive.ha.ipaddress";
+
+ public static final String SLAVE_PORT = "hive.ha.slave.port";
+
+ public static final String HA_HIVE_PROPERTIES = "ha-hive.properties";
+
+ public static final String HIVE_SERVER_EPHEMERAL_PATH_PREFIX = "/hiveserver_";
+
+ public static final String COPY_TIME_OUT = "hive.freeze.timeout";
+
+ public static final long COPY_TIME_OUT_DEFAULT = 120;
+
+ public static final String DB_INITIALIZATION_STATUS = "DB_INITIALIZATION_STATUS";
+
+ public static final String JMX_CONNECTOR_PORT = "ha.jmx.connector.server.port";
+
+ public static final String IS_IN_HAMODE = "is.in.hamode";
+
+ public static final String STOP_REPLICATION = "stopReplication";
+
+ public static final String MAX_SLAVE_DB_STARTUP_RETRIES = "max.slave.db.startup.retries";
+
+ public static final String MAX_STOP_REPLICATION_MASTER_TIME = "max.stop.replication.master.time";
+
+ public static final String MAX_DB_INITALIZATION_TIME = "max.db.initilization.time";
+
+ public static final String HIVE_DEFAULT_XML = "hive-default.xml";
+
+ public static final String ACTINE_HIVE_STARTUP_TIME_OUT = "hive.active.server.startup.timeout";
+
+ public static final String MBEAN_NAME = "HiveHaMBean:name=haMBean";
+
+ public static final String SOCKET_SO_TIMEOUT = "hive.replication.socket.timeout";
+
+ public static final int DEFAULT_SOCKET_TIMEOUT = 180000;
+
+ public static final String HIVE_SERVICE_NAME = "/hive";
+
+ public static final String DERBY_HOME = "derby.system.home";
+}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/util/HiveConfigReader.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/util/HiveConfigReader.java (revision 0)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/util/HiveConfigReader.java (revision 0)
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.util;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+
+
+public class HiveConfigReader
+{
+ private static final String DUMMY_PORT = "9000";
+
+ private static final String COLON = ":";
+
+ private static HiveConfigReader instance = new HiveConfigReader ();
+
+ private Properties hAConf;
+
+ private String dbName;
+
+ private HiveConfigReader ()
+ {
+
+ }
+
+ public static HiveConfigReader getInstance ()
+ {
+ return instance;
+ }
+
+ public void validateLocalProperties ( Properties properties )
+ {
+ this.hAConf = properties;
+
+ validatePortNumber ( properties.getProperty ( HaConstants.JMX_CONNECTOR_PORT ),
+ HaConstants.JMX_CONNECTOR_PORT );
+ validatePortNumber ( properties.getProperty ( HaConstants.JMX_PORT ), HaConstants.JMX_PORT );
+ validatePortNumber ( properties.getProperty ( HaConstants.SLAVE_PORT ),
+ HaConstants.SLAVE_PORT );
+ validatePortNumber ( properties.getProperty ( HaConstants.SOCKET_PORT ),
+ HaConstants.SOCKET_PORT );
+ validateNonEmptyString ( properties.getProperty ( HaConstants.IP_ADDRESS ),
+ HaConstants.IP_ADDRESS );
+ validateIPAddress ( properties.getProperty ( HaConstants.IP_ADDRESS ) );
+ }
+
+ public void validateIPAddress ( String value )
+ {
+ String initialValue = value;
+ value = value + COLON + DUMMY_PORT;
+ InetSocketAddress createSocketAddr = null;
+ try
+ {
+ createSocketAddr = NetUtils.createSocketAddr ( value );
+ InetAddress address = createSocketAddr.getAddress ();
+ if ( null == address )
+ {
+ throw new IllegalArgumentException ( HaConstants.IP_ADDRESS + " value is null " );
+ }
+ else
+ {
+ if ( false == value.contains ( address.getHostAddress () ) )
+ {
+ throw new IllegalArgumentException ( HaConstants.IP_ADDRESS
+ + " value is invalid : " + value );
+ }
+ }
+ }
+ catch ( Exception e )
+ {
+ throw new IllegalArgumentException ( HaConstants.IP_ADDRESS + " value is invalid : "
+ + initialValue );
+ }
+ }
+
+ public void validateNonEmptyString ( String value, String argName )
+ {
+ if ( null == value )
+ {
+ throw new IllegalArgumentException ( argName + " value is not entered" );
+ }
+ else if ( 0 == value.trim ().length () )
+ {
+ throw new IllegalArgumentException ( argName + " value should not be empty" );
+ }
+ }
+
+ public void validatePortNumber ( String portNo, String propertyName )
+ {
+ if ( null == portNo )
+ {
+ throw new IllegalArgumentException ( propertyName + " value is not entered" );
+ }
+ else
+ {
+ try
+ {
+ int parseInt = Integer.parseInt ( portNo );
+ if ( parseInt < 1024 || parseInt > 65535 )
+ {
+ throw new IllegalArgumentException ( propertyName + " value is invalid ["
+ + portNo
+ + "], please enter a valid port number in given range [1024-65535]" );
+ }
+ }
+ catch ( NumberFormatException e )
+ {
+ throw new IllegalArgumentException ( propertyName + " value is invalid [" + portNo
+ + "], please enter a valid port number in the range [1024-65535]" );
+ }
+ }
+ }
+
+ public Properties getHAConf ()
+ {
+ return hAConf;
+ }
+
+ public String getDbName ()
+ {
+ if ( null == dbName )
+ {
+ Configuration configuration = new Configuration ();
+ configuration.addResource ( HaConstants.HIVE_DEFAULT_XML );
+ dbName = configuration.get ( HaConstants.DATABASE_NAME );
+ }
+ return dbName;
+ }
+}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/util/HiveDerbyUtils.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/util/HiveDerbyUtils.java (revision 0)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/util/HiveDerbyUtils.java (revision 0)
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.util;
+
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+public class HiveDerbyUtils
+{
+ private static final String SHUTDOWN_DERBY_SUCCESS = "08006";
+
+ private static final Log LOG = LogFactory.getLog ( HiveDerbyUtils.class.getName () );
+
+ public static void shutDownDerby ()
+ {
+ try
+ {
+ HaCommonUtils.getConnection ( "jdbc:derby:"
+ + HiveConfigReader.getInstance ().getDbName () + ";shutdown=true" );
+ }
+ catch ( SQLException e )
+ {
+ // SQL STATE : 08006 - means DB shutdown success in Derby
+ if ( SHUTDOWN_DERBY_SUCCESS.equals ( e.getSQLState () ) )
+ {
+ LOG.warn ( "Stopped active derby successfully." );
+ }
+ else
+ {
+ LOG.warn ( "Unable to stop active derby.", e );
+ }
+ }
+ }
+}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/util/HiveZipUtils.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/util/HiveZipUtils.java (revision 0)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/util/HiveZipUtils.java (revision 0)
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.util;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.Deque;
+import java.util.Enumeration;
+import java.util.LinkedList;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+import java.util.zip.ZipOutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.zk.leaderelection.OperationFailedException;
+
+public class HiveZipUtils
+{
+
+ private static final String FORWARD_SLASH = "/";
+
+ private static final Log LOG = LogFactory.getLog ( HiveZipUtils.class.getName () );
+
+ public static void zipFile ( String sourceFolder, String destinationZipFile, boolean overWrite )
+ {
+ File inFolder = new File ( sourceFolder );
+ File outFile = new File ( destinationZipFile );
+ if ( overWrite == true )
+ {
+ delete ( outFile );
+ }
+ try
+ {
+ zip ( inFolder, outFile );
+ }
+ catch ( IOException e )
+ {
+ LOG.info ( "Unable to compress directory " + sourceFolder, e );
+ }
+ }
+
+ public static void unZipFile ( String sourceZipFilePath, String destinationFolderPath,
+ boolean overWrite, boolean deleteZipFile )
+ {
+
+ File sourceZipFile = new File ( sourceZipFilePath );
+ File destinationFolder = new File ( destinationFolderPath );
+ if ( overWrite == true )
+ {
+ delete ( destinationFolder );
+ try
+ {
+ unzip ( sourceZipFile, destinationFolder );
+ }
+ catch ( IOException e )
+ {
+ LOG.error ( "unable to Unzip file, fileName:" + sourceZipFilePath, e );
+ throw new OperationFailedException ( "unable to Unzip file, fileName:"
+ + sourceZipFilePath, e );
+ }
+ if ( deleteZipFile )
+ {
+ delete ( sourceZipFile );
+ }
+ }
+
+ }
+
+ private static void delete ( File sourceFolder )
+ {
+ if ( sourceFolder.exists () )
+ {
+ if ( sourceFolder.isDirectory () )
+ {
+ File [] listFiles = sourceFolder.listFiles ();
+ for ( File file : listFiles )
+ {
+ delete ( file );
+ }
+ }
+ else
+ {
+ sourceFolder.delete ();
+ }
+ }
+ }
+
+ public static void unzip ( File zipfile, File directory ) throws IOException
+ {
+ ZipFile zfile = new ZipFile ( zipfile );
+ Enumeration < ? extends ZipEntry > entries = zfile.entries ();
+ File file = null;
+ while ( entries.hasMoreElements () )
+ {
+ ZipEntry entry = entries.nextElement ();
+ file = new File ( directory, entry.getName () );
+ if ( entry.isDirectory () )
+ {
+ file.mkdirs ();
+ }
+ else
+ {
+ file.getParentFile ().mkdirs ();
+ InputStream in = zfile.getInputStream ( entry );
+ try
+ {
+ copy ( in, file );
+ }
+ finally
+ {
+ if ( null != in )
+ {
+ in.close ();
+ }
+ }
+ }
+ }
+ }
+
+ private static void copy ( InputStream in, File file ) throws IOException
+ {
+ OutputStream out = new FileOutputStream ( file );
+ try
+ {
+ copy ( in, out );
+ }
+ finally
+ {
+ if ( null != out )
+ {
+ out.flush ();
+ out.close ();
+ }
+ }
+ }
+
+ public static void zip ( File directory, File zipfile ) throws IOException
+ {
+ URI base = directory.toURI ();
+ Deque < File > queue = new LinkedList < File > ();
+ queue.push ( directory );
+ OutputStream out = new FileOutputStream ( zipfile );
+ Closeable res = out;
+ ZipOutputStream zout = null;
+ try
+ {
+ zout = new ZipOutputStream ( out );
+ res = zout;
+ while ( false == queue.isEmpty () )
+ {
+ directory = queue.pop ();
+ for ( File kid : directory.listFiles () )
+ {
+ String name = base.relativize ( kid.toURI () ).getPath ();
+ if ( kid.isDirectory () )
+ {
+ queue.push ( kid );
+ name = name.endsWith ( FORWARD_SLASH ) ? name : name + FORWARD_SLASH;
+ zout.putNextEntry ( new ZipEntry ( name ) );
+ }
+ else
+ {
+ zout.putNextEntry ( new ZipEntry ( name ) );
+ copy ( kid, zout );
+ zout.closeEntry ();
+ }
+ }
+ }
+ }
+ finally
+ {
+ if ( null != res )
+ {
+ zout.flush ();
+ out.flush ();
+ res.close ();
+ out.close ();
+ }
+ }
+ }
+
+ private static void copy ( InputStream in, OutputStream out ) throws IOException
+ {
+ byte [] buffer = new byte [1024];
+ while ( true )
+ {
+ int readCount = in.read ( buffer );
+ if ( readCount < 0 )
+ {
+ break;
+ }
+ out.write ( buffer, 0, readCount );
+ }
+ }
+
+ private static void copy ( File file, OutputStream out ) throws IOException
+ {
+ InputStream in = new FileInputStream ( file );
+ try
+ {
+ copy ( in, out );
+ }
+ finally
+ {
+ if ( null != in )
+ {
+ in.close ();
+ }
+ }
+ }
+
+}
Index: ql/src/java/conf/hive-exec-log4j.properties
===================================================================
--- ql/src/java/conf/hive-exec-log4j.properties (revision 1151733)
+++ ql/src/java/conf/hive-exec-log4j.properties (working copy)
@@ -33,6 +33,26 @@
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+#
+# Rolling File Appender - HA Framework
+#
+log4j.logger.haframework=INFO, HA
+log4j.logger.org.I0Itec.zkclient=INFO, HA
+log4j.logger.org.apache.zookeeper=INFO, HA
+
+log4j.appender.HA=org.apache.log4j.RollingFileAppender
+log4j.appender.HA.File=${hive.log.dir}/HA_${hive.log.file}
+
+# Logfile size and and 10 backups
+log4j.appender.HA.MaxFileSize=5MB
+log4j.appender.HA.MaxBackupIndex=100
+
+log4j.appender.HA.layout=org.apache.log4j.PatternLayout
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.HA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+# Debugging Pattern format
+log4j.appender.HA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
#custom logging levels
#log4j.logger.xxx=DEBUG
Index: service/src/java/org/apache/hadoop/hive/service/HaHiveServer.java
===================================================================
--- service/src/java/org/apache/hadoop/hive/service/HaHiveServer.java (revision 0)
+++ service/src/java/org/apache/hadoop/hive/service/HaHiveServer.java (revision 0)
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.service;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.hive.metastore.util.HaCommonUtils;
+import org.apache.hadoop.hive.metastore.util.HaConstants;
+import org.apache.hadoop.hive.metastore.util.HiveConfigReader;
+import org.apache.zk.leaderelection.manager.ProcessManagerConstants;
+import org.apache.hadoop.hive.common.PropertyReader;
+import org.apache.hadoop.hive.common.LogUtility;
+
+public class HaHiveServer
+{
+ private static String [] statrUpArgs = new String [] {};
+
+ private static final Log LOG = LogFactory.getLog ( HaHiveServer.class );
+
+ static
+ {
+ try
+ {
+ Class.forName ( HaConstants.HIVE_EMBEDDED_DRIVER );
+ }
+ catch ( ClassNotFoundException e )
+ {
+ LOG.fatal ( "Unable to load driver : " + HaConstants.HIVE_EMBEDDED_DRIVER, e );
+ throw new ExceptionInInitializerError ( e );
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static void main ( String [] args )
+ {
+ try
+ {
+ LogUtility.initHiveLog4j ();
+ statrUpArgs = args;
+ // Validating the properties before Starting the HiveServer
+ Properties properties = PropertyReader.getProperties ( HaConstants.HA_HIVE_PROPERTIES );
+ HiveConfigReader.getInstance ().validateLocalProperties ( properties );
+
+ properties.put ( ProcessManagerConstants.ZK_EPHE_PATH,
+ HaConstants.HIVE_SERVER_EPHEMERAL_PATH_PREFIX );
+ properties
+ .put ( ProcessManagerConstants.JMX_SERVICE_NAME, HaConstants.JMX_SERVICE_NAME );
+ properties.put ( ProcessManagerConstants.SERVICE_NAME, HaConstants.HIVE_SERVICE_NAME );
+
+ HashMap < String, String > data = getDataProperties ( properties );
+ HashMap < String, String > config = new HashMap < String, String > ( ( Map ) properties );
+ HaCommonUtils.initializeProcess ( config, data );
+ }
+ catch ( Throwable throwable )
+ {
+ LOG.fatal ( "Error occured while starting the Hive process.", throwable );
+ HaCommonUtils.terminate ();
+ }
+ }
+
+ private static HashMap < String, String > getDataProperties ( Properties hiveProperties )
+ {
+ HashMap < String, String > dataProperties = new HashMap < String, String > ( 3, 1F );
+
+ dataProperties.put ( HaConstants.IP_ADDRESS, hiveProperties
+ .getProperty ( HaConstants.IP_ADDRESS ) );
+ dataProperties.put ( HaConstants.SOCKET_PORT, hiveProperties
+ .getProperty ( HaConstants.SOCKET_PORT ) );
+ dataProperties.put ( HaConstants.JMX_PORT, hiveProperties
+ .getProperty ( HaConstants.JMX_PORT ) );
+ return dataProperties;
+ }
+
+ /**
+ * @return
+ */
+ public static String [] getStatrUpArgs ()
+ {
+ return statrUpArgs;
+ }
+}
Index: service/src/java/org/apache/hadoop/hive/service/HiveLRM.java
===================================================================
--- service/src/java/org/apache/hadoop/hive/service/HiveLRM.java (revision 0)
+++ service/src/java/org/apache/hadoop/hive/service/HiveLRM.java (revision 0)
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.service;
+
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.service.HiveServer;
+
+import org.apache.hadoop.hive.metastore.task.HiveStandByServer;
+import org.apache.hadoop.hive.metastore.task.MetadataSyncTask;
+import org.apache.hadoop.hive.metastore.util.HaCommonUtils;
+import org.apache.hadoop.hive.metastore.util.HaConstants;
+import org.apache.hadoop.hive.metastore.util.HiveDerbyUtils;
+import org.apache.hadoop.hive.common.HAThreadGroup;
+import org.apache.zk.leaderelection.manager.LRM;
+import org.apache.zk.leaderelection.OperationFailedException;
+import org.apache.hadoop.hive.service.HaHiveServer;
+import org.apache.hadoop.hive.metastore.task.HiveRegistry;
+import org.apache.hadoop.hive.metastore.task.Keeper;
+import org.apache.hadoop.hive.metastore.task.TaskKeeper;
+
+public class HiveLRM implements LRM
+{
+
+ private static final String HIVE_STARTED_IN_ACTIVE_MODE = "HiveServer started in ACTIVE mode";
+
+ private static final String STARTING_HIVE_IN_ACTIVE = "Starting HiveServer in ACTIVE mode";
+
+ private static final String HIVE_STANDBY_FAILED = "Unable to start HiveServer in STANDBY mode";
+
+ private static final String HIVE_STANDBY_SUCCESS = "HiveServer started STANDBY mode";
+
+ private static final String HIVE_ACTIVE_FAILED = "Unable to start HiveServer in ACTIVE mode";
+
+ private static final String HIVE_STANDBY_TO_ACTIVE = "Starting HiveServer in ACTIVE [STANDBY to ACTIVE]";
+
+ private static final String HIVE_NEUTRAL_FAILED = "Unable to change HiveServer mode to NEUTRAL";
+
+ private static final String HIVE_NEUTRAL_SUCCESS = "HiveServer changed to NEUTRAL mode";
+
+ private static final Log LOG = LogFactory.getLog ( HiveLRM.class.getName () );
+
+ private HiveServer activeServer;
+
+ private HiveStandByServer standByServer;
+
+ private MetadataSyncTask metadataThread;
+
+ private volatile boolean isActive = false;
+
+
+ /*
+ *
+ */
+ public HiveLRM ()
+ {
+ HiveRegistry.register ( Keeper.TASKKEEPER, new TaskKeeper () );
+ // conf.setBoolean(HaConstants.IS_IN_HAMODE, true);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hive.service.HiveLRM#startInActive()
+ */
+ @Override
+ public void startInActive ()
+ {
+ startHiveServerInActiveMode ();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hive.service.HiveLRM#startInStandby(java.util.Map)
+ */
+ @Override
+ public void startInStandby ( Map < String, String > activeServerProps )
+ {
+ startHiveServerInStandByMode ( activeServerProps );
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hive.service.HiveLRM#activeToNeutral()
+ */
+ @Override
+ public void activeToNeutral ()
+ {
+ shutDownHiveServer ( HIVE_NEUTRAL_SUCCESS, HIVE_NEUTRAL_FAILED );
+ HiveDerbyUtils.shutDownDerby ();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hive.service.HiveLRM#standByToActive()
+ */
+ @Override
+ public void standByToActive ()
+ {
+ LOG.info ( HIVE_STANDBY_TO_ACTIVE );
+ ( ( Keeper ) HiveRegistry.getObject ( Keeper.TASKKEEPER ) ).killAllJobs (
+ new Configuration (), HiveConf.ConfVars.HIVEJOBSPATH.defaultVal );
+ stopStandByServer ();
+ startHiveServerInActiveMode ();
+ }
+
+ private void stopStandByServer ()
+ {
+ if ( null != standByServer )
+ {
+ standByServer.destroy ();
+ }
+ HaCommonUtils.stopSlaveDerby ();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hive.service.HiveLRM#neutralToActive()
+ */
+ @Override
+ public void neutralToActive ()
+ {
+ startHiveServerInActiveMode ();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hive.service.HiveLRM#neutralToStandBy(java.util.Map)
+ */
+ @Override
+ public void neutralToStandBy ( Map < String, String > activeServerProps )
+ {
+ startHiveServerInStandByMode ( activeServerProps );
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hive.service.HiveLRM#standByToNeutral()
+ */
+ @Override
+ public void standByToNeutral ()
+ {
+ LOG.info ( "HiveServer state changed from STANDBY to NEUTRAL" );
+ isActive = false;
+ stopStandByServer ();
+ HiveDerbyUtils.shutDownDerby ();
+ }
+
+ protected void startMetadataTask ()
+ {
+ metadataThread = new MetadataSyncTask ( "MetadataSyncTask" );
+ metadataThread.start ();
+ }
+
+ private void startHiveServerInActiveMode ()
+ {
+ ( ( Keeper ) HiveRegistry.getObject ( Keeper.TASKKEEPER ) ).killAllJobs (
+ new Configuration (), HiveConf.ConfVars.HIVEJOBSPATH.defaultVal );
+ try
+ {
+ LOG.info ( STARTING_HIVE_IN_ACTIVE );
+ startMetadataTask ();
+ activeServer = new HiveServer ();
+ spawnAndWait ( HaHiveServer.getStatrUpArgs () );
+
+ // REGISTER JMX MBEAN
+ HaCommonUtils.registerMBean ();
+
+ isActive = true;
+ LOG.info ( HIVE_STARTED_IN_ACTIVE_MODE );
+ }
+ catch ( Throwable e )
+ {
+ isActive = false;
+ LOG.fatal ( HIVE_ACTIVE_FAILED, e );
+ activeServer.destroy ();
+ throw new OperationFailedException ( HIVE_ACTIVE_FAILED, e );
+ }
+ }
+
+ private void startHiveServerInStandByMode ( Map < String, String > activeServerProps )
+ {
+ try
+ {
+ // In either case, from neutral to standby or starting in standby, the isactive is
+ // set false
+ isActive = false;
+ // INITIALIZE STANDBY SERVER
+ standByServer = new HiveStandByServer ();
+ standByServer.init ( activeServerProps );
+ LOG.info ( HIVE_STANDBY_SUCCESS );
+ }
+ catch ( Throwable e )
+ {
+ LOG.fatal ( HIVE_STANDBY_FAILED, e );
+ stopStandByServer ();
+ throw new OperationFailedException ( HIVE_STANDBY_FAILED, e );
+ }
+ }
+
+ private void shutDownHiveServer ( String msgShutDownSuccess, String msgShutDownFail )
+ {
+ isActive = false;
+ if ( null != activeServer )
+ {
+ if ( activeServer.isServerRunning () )
+ {
+ activeServer.destroy ();
+ cleanUpServerSocket ();
+ LOG.info ( msgShutDownSuccess );
+ }
+ else
+ {
+ LOG.info ( msgShutDownFail );
+ throw new OperationFailedException (
+ "Exception occured while shutting down HiveServer" );
+ }
+ HaCommonUtils.unRegisterMBean ();
+ }
+ else
+ {
+ LOG.info ( msgShutDownSuccess );
+ }
+ }
+
+ private void cleanUpServerSocket ()
+ {
+ if ( null != metadataThread )
+ {
+ metadataThread.closeSocket ();
+ }
+ }
+
+ @Override
+ public boolean isPrimary ()
+ {
+ return isActive;
+ }
+
+ @Override
+ public void stop ()
+ {
+ isActive = false;
+ }
+
+ @Override
+ public void initialize ( Object obj )
+ {
+
+ }
+
+ /**
+ * This method is used to start the JT in a separate thread.
+ *
+ * @throws Throwable
+ * On any exception rethrow
+ */
+ protected void spawnAndWait ( String [] args ) throws Throwable
+ {
+ // create a threadgroup.
+ HAThreadGroup group = new HAThreadGroup ( "HiveHAStarter" );
+ // Start the thread.
+ Thread thread = new Thread ( group, new HiveStarter ( activeServer, args ) );
+ thread.start ();
+
+ // Wait for hive to get started or re-throw the exception incase any exception is
+ // thrown on the startup
+ while ( true )
+ {
+ // check the state if its running.
+ if ( activeServer.isServerRunning () )
+ {
+ break;
+ }
+ else
+ {
+ // check if any exception thrown on starting the JT
+ Throwable exception = group.getException ();
+ if ( exception != null )
+ {
+ throw exception;
+ }
+ }
+ // release cpu usage...
+ HaCommonUtils.sleepTimeOut ( 100 );
+ }
+ }
+}
Index: service/src/java/org/apache/hadoop/hive/service/HiveServer.java
===================================================================
--- service/src/java/org/apache/hadoop/hive/service/HiveServer.java (revision 1151733)
+++ service/src/java/org/apache/hadoop/hive/service/HiveServer.java (working copy)
@@ -41,6 +41,7 @@
import org.apache.hadoop.hive.metastore.HiveMetaStore;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.metastore.util.HaCommonUtils;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.plan.api.QueryPlan;
@@ -62,6 +63,7 @@
import org.apache.thrift.transport.TTransportFactory;
import com.facebook.fb303.fb_status;
+import com.sun.corba.se.impl.oa.toa.TOA;
/**
* Thrift Hive Server Implementation.
@@ -85,6 +87,13 @@
private static final int DEFAULT_MAX_WORKER_THREADS = Integer.MAX_VALUE;
/**
+ * To know the server status
+ */
+ private volatile boolean isServerRunning = false;
+
+ private TServer server;
+
+ /**
* Handler which implements the Hive Interface This class can be used in lieu
* of the HiveClient class to get an embedded server.
*/
@@ -186,7 +195,12 @@
// case, when calling fetch quueries since execute() has returned.
// For now, we disable the test attempts.
driver.setTryCount(Integer.MAX_VALUE);
- response = driver.run(cmd);
+ try {
+ HaCommonUtils.lock.readLock().lock();
+ response = driver.run(cmd);
+ } finally {
+ HaCommonUtils.lock.readLock().unlock();
+ }
} else {
isHiveQuery = false;
driver = null;
@@ -643,6 +657,10 @@
}
public static void main(String[] args) {
+ new HiveServer().init(args);
+ }
+
+ public void init(String[] args) {
try {
HiveServerCli cli = new HiveServerCli();
@@ -675,7 +693,7 @@
TThreadPoolServer.Options options = new TThreadPoolServer.Options();
options.minWorkerThreads = cli.minWorkerThreads;
options.maxWorkerThreads = cli.maxWorkerThreads;
- TServer server = new TThreadPoolServer(hfactory, serverTransport,
+ server = new TThreadPoolServer(hfactory, serverTransport,
new TTransportFactory(), new TTransportFactory(),
new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), options);
@@ -686,10 +704,28 @@
if (cli.isVerbose()) {
System.err.println(msg);
}
-
+ isServerRunning = true;
server.serve();
} catch (Exception x) {
+ isServerRunning = false;
x.printStackTrace();
}
}
+
+ public boolean isServerRunning() {
+ return isServerRunning;
+ }
+
+ public void setServerRunning(boolean isServerRunning) {
+ this.isServerRunning = isServerRunning;
+ }
+
+ public void destroy() {
+ if (isServerRunning) {
+ if (null != server) {
+ server.stop();
+ }
+ isServerRunning = false;
+ }
+ }
}
Index: service/src/java/org/apache/hadoop/hive/service/HiveStarter.java
===================================================================
--- service/src/java/org/apache/hadoop/hive/service/HiveStarter.java (revision 0)
+++ service/src/java/org/apache/hadoop/hive/service/HiveStarter.java (revision 0)
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.service;
+
+import org.apache.hadoop.hive.service.HiveServer;
+
+/**
+ * This will start the Hive in a separate thread because the serve will be a blocking call.
+ */
+public class HiveStarter implements Runnable
+{
+
+ private final HiveServer hiveServer;
+ private final String [] args;
+
+ public HiveStarter ( HiveServer hiveServer, String [] args )
+ {
+ this.args = args;
+ this.hiveServer = hiveServer;
+ }
+
+ @Override
+ public void run ()
+ {
+ try
+ {
+ // start the hive and incase of any exception throw it as runtime exception.
+ hiveServer.init ( args );
+ }
+ catch ( Exception e )
+ {
+ // rethrow the exception and it will be caught by the hive Thread group.
+ throw new RuntimeException ( e );
+ }
+ }
+
+}