diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index b459ee3..ed02e30 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1351,6 +1351,28 @@ private static void addDeprecatedKeys() {
public static final long
DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS = 1000;
+ /** Flag to enable recovery of timeline service */
+ public static final String TIMELINE_SERVICE_RECOVERY_ENABLED =
+ TIMELINE_SERVICE_PREFIX + "recovery.enabled";
+ public static final boolean DEFAULT_TIMELINE_SERVICE_RECOVERY_ENABLED = false;
+
+ /** Timeline service state store class */
+ public static final String TIMELINE_SERVICE_STATE_STORE_CLASS =
+ TIMELINE_SERVICE_PREFIX + "state-store-class";
+
+ public static final String TIMELINE_SERVICE_LEVELDB_STATE_STORE_PREFIX =
+ TIMELINE_SERVICE_PREFIX + "leveldb-state-store.";
+
+ /** Timeline service state store leveldb path */
+ public static final String TIMELINE_SERVICE_LEVELDB_STATE_STORE_PATH =
+ TIMELINE_SERVICE_LEVELDB_STATE_STORE_PREFIX + "path";
+
+ /** Timeline service state store leveldb cache (uncompressed blocks) */
+ public static final String TIMELINE_SERVICE_LEVELDB_STATE_STORE_CACHE_SIZE =
+ TIMELINE_SERVICE_LEVELDB_STATE_STORE_PREFIX + "cache-size";
+ public static final long DEFAULT_TIMELINE_SERVICE_LEVELDB_STATE_STORE_CACHE_SIZE =
+ 1024;
+
// ///////////////////////////////
// Shared Cache Configs
// ///////////////////////////////
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 1e7d544..0d3a172 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1359,6 +1359,32 @@
1000
+
+ Enable timeline server to recover state after starting. If
+ true, then yarn.timeline-service.state-store-class must be specified.
+
+ yarn.timeline-service.recovery.enabled
+ false
+
+
+
+ Store class name for timeline state store.
+ yarn.timeline-service.state-store-class
+ org.apache.hadoop.yarn.server.timeline.recovery.LeveldbTimelineServiceStateStoreService
+
+
+
+ Store file name for leveldb state store.
+ yarn.timeline-service.leveldb-state-store.path
+ ${hadoop.tmp.dir}/yarn/timeline
+
+
+
+ Cache size of uncompressed blocks for leveldb state store in bytes.
+ yarn.timeline-service.leveldb-state-store.read-size
+ 1024
+
+
Whether the shared cache is enabled
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
index c7e305c..3524605 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
@@ -106,9 +106,8 @@ protected void serviceStart() throws Exception {
} catch(IOException ie) {
throw new YarnRuntimeException("Failed to login", ie);
}
-
- startWebApp();
super.serviceStart();
+ startWebApp();
}
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java
index c4ea996..7e810f9 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java
@@ -330,7 +330,8 @@ synchronized void returnLock(CountingReentrantLock lock) {
}
}
- private static class KeyBuilder {
+ @Private
+ public static class KeyBuilder {
private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10;
private byte[][] b;
private boolean[] useSeparator;
@@ -632,7 +633,8 @@ public int compare(byte[] o1, byte[] o2) {
/**
* Returns true if the byte array begins with the specified prefix.
*/
- private static boolean prefixMatches(byte[] prefix, int prefixlen,
+ @Private
+ public static boolean prefixMatches(byte[] prefix, int prefixlen,
byte[] b) {
if (b.length < prefixlen) {
return false;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/recovery/LeveldbTimelineServiceStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/recovery/LeveldbTimelineServiceStateStoreService.java
new file mode 100644
index 0000000..ba75111
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/recovery/LeveldbTimelineServiceStateStoreService.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline.recovery;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
+import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore.KeyBuilder;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBIterator;
+import org.iq80.leveldb.Options;
+
+/**
+ * A timeline service state storage implementation that supports any persistent
+ * storage that adheres to the LevelDB interface.
+ */
+public class LeveldbTimelineServiceStateStoreService extends
+ TimelineServiceStateStoreService {
+
+ public static final Log LOG =
+ LogFactory.getLog(LeveldbTimelineServiceStateStoreService.class);
+
+ private static final String FILENAME = "leveldb-state-store.ldb";
+ private static final FsPermission LEVELDB_DIR_UMASK = FsPermission
+ .createImmutable((short) 0700);
+
+ private static final byte[] TOKEN_ENTRY_PREFIX = "t".getBytes();
+ private static final byte[] TOKEN_MASTER_KEY_ENTRY_PREFIX = "k".getBytes();
+
+ private DB db;
+
+ public LeveldbTimelineServiceStateStoreService() {
+ super(LeveldbTimelineServiceStateStoreService.class.getName());
+ }
+
+ @Override
+ protected void initStorage(Configuration conf) throws IOException {
+ Options options = new Options();
+ options.createIfMissing(true);
+ options
+ .cacheSize(conf
+ .getLong(
+ YarnConfiguration.TIMELINE_SERVICE_LEVELDB_STATE_STORE_CACHE_SIZE,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_STATE_STORE_CACHE_SIZE));
+ JniDBFactory factory = new JniDBFactory();
+ Path dbPath =
+ new Path(
+ conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_STATE_STORE_PATH),
+ FILENAME);
+ FileSystem localFS = null;
+ try {
+ localFS = FileSystem.getLocal(conf);
+ if (!localFS.exists(dbPath)) {
+ if (!localFS.mkdirs(dbPath)) {
+ throw new IOException("Couldn't create directory for leveldb " +
+ "timeline store " + dbPath);
+ }
+ localFS.setPermission(dbPath, LEVELDB_DIR_UMASK);
+ }
+ } finally {
+ IOUtils.cleanup(LOG, localFS);
+ }
+ LOG.info("Using leveldb path " + dbPath);
+ db = factory.open(new File(dbPath.toString()), options);
+ }
+
+ @Override
+ protected void startStorage() throws IOException {
+ }
+
+ @Override
+ protected void closeStorage() throws IOException {
+ IOUtils.cleanup(LOG, db);
+ }
+
+ @Override
+ public TimelineServiceState loadState() throws IOException {
+ LOG.info("Loading timeline service state from leveldb");
+ TimelineServiceState state = new TimelineServiceState();
+ int numKeys = loadTokenMasterKeys(state);
+ int numTokens = loadTokens(state);
+ LOG.info("Loaded " + numKeys + " master keys and " + numTokens
+ + " tokens from leveldb");
+ return state;
+ }
+
+ @Override
+ public void storeToken(TimelineDelegationTokenIdentifier tokenId,
+ Long renewDate) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing token " + tokenId.getSequenceNumber());
+ }
+ byte[] k = createTokenEntryKey(tokenId.getSequenceNumber());
+ if (db.get(k) != null) {
+ throw new IOException(tokenId + " already exists");
+ }
+ byte[] v = buildTokenData(tokenId, renewDate);
+ db.put(k, v);
+ }
+
+ @Override
+ public void updateToken(TimelineDelegationTokenIdentifier tokenId,
+ Long renewDate) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Updating token " + tokenId.getSequenceNumber());
+ }
+ byte[] k = createTokenEntryKey(tokenId.getSequenceNumber());
+ if (db.get(k) == null) {
+ throw new IOException(tokenId + " doesn't exist");
+ }
+ byte[] v = buildTokenData(tokenId, renewDate);
+ db.put(k, v);
+ }
+
+ @Override
+ public void removeToken(TimelineDelegationTokenIdentifier tokenId)
+ throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing token " + tokenId.getSequenceNumber());
+ }
+ byte[] key = createTokenEntryKey(tokenId.getSequenceNumber());
+ db.delete(key);
+ }
+
+ @Override
+ public void storeTokenMasterKey(DelegationKey key) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing master key " + key.getKeyId());
+ }
+ byte[] k = createTokenMasterKeyEntryKey(key.getKeyId());
+ if (db.get(k) != null) {
+ throw new IOException(key + " already exists");
+ }
+ byte[] v = buildTokenMasterKeyData(key);
+ db.put(k, v);
+ }
+
+ @Override
+ public void removeTokenMasterKey(DelegationKey key) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing master key " + key.getKeyId());
+ }
+ byte[] k = createTokenMasterKeyEntryKey(key.getKeyId());
+ db.delete(k);
+ }
+
+ private static byte[] buildTokenData(
+ TimelineDelegationTokenIdentifier tokenId, Long renewDate)
+ throws IOException {
+ ByteArrayOutputStream memStream = new ByteArrayOutputStream();
+ DataOutputStream dataStream = new DataOutputStream(memStream);
+ try {
+ dataStream.writeLong(renewDate);
+ tokenId.write(dataStream);
+ dataStream.close();
+ } finally {
+ IOUtils.cleanup(LOG, dataStream);
+ }
+ return memStream.toByteArray();
+ }
+
+ private static byte[] buildTokenMasterKeyData(DelegationKey key)
+ throws IOException {
+ ByteArrayOutputStream memStream = new ByteArrayOutputStream();
+ DataOutputStream dataStream = new DataOutputStream(memStream);
+ try {
+ key.write(dataStream);
+ dataStream.close();
+ } finally {
+ IOUtils.cleanup(LOG, dataStream);
+ }
+ return memStream.toByteArray();
+ }
+
+ private static void loadTokenMasterKeyData(TimelineServiceState state,
+ byte[] keyData)
+ throws IOException {
+ DelegationKey key = new DelegationKey();
+ DataInputStream in =
+ new DataInputStream(new ByteArrayInputStream(keyData));
+ try {
+ key.readFields(in);
+ } finally {
+ IOUtils.cleanup(LOG, in);
+ }
+ state.tokenMasterKeyState.add(key);
+ }
+
+ private static void
+ loadTokenData(TimelineServiceState state, byte[] tokenData)
+ throws IOException {
+ TimelineDelegationTokenIdentifier tokenId =
+ new TimelineDelegationTokenIdentifier();
+ long renewDate;
+ DataInputStream in =
+ new DataInputStream(new ByteArrayInputStream(tokenData));
+ try {
+ renewDate = in.readLong();
+ tokenId.readFields(in);
+ } finally {
+ IOUtils.cleanup(LOG, in);
+ }
+ state.tokenState.put(tokenId, renewDate);
+ }
+
+ private int loadTokenMasterKeys(TimelineServiceState state) throws IOException {
+ byte[] base = KeyBuilder.newInstance().add(TOKEN_MASTER_KEY_ENTRY_PREFIX)
+ .getBytesForLookup();
+ int numKeys = 0;
+ DBIterator iterator = null;
+ try {
+ for (iterator = db.iterator(), iterator.seek(base); iterator.hasNext(); iterator
+ .next()) {
+ byte[] k = iterator.peekNext().getKey();
+ if (!LeveldbTimelineStore.prefixMatches(base, base.length, k)) {
+ break;
+ }
+ byte[] v = iterator.peekNext().getValue();
+ loadTokenMasterKeyData(state, v);
+ ++numKeys;
+ }
+ } finally {
+ IOUtils.cleanup(LOG, iterator);
+ }
+ return numKeys;
+ }
+
+ private int loadTokens(TimelineServiceState state) throws IOException {
+ byte[] base = KeyBuilder.newInstance().add(TOKEN_ENTRY_PREFIX)
+ .getBytesForLookup();
+ int numTokens = 0;
+ DBIterator iterator = null;
+ try {
+ for (iterator = db.iterator(), iterator.seek(base); iterator.hasNext(); iterator
+ .next()) {
+ byte[] k = iterator.peekNext().getKey();
+ if (!LeveldbTimelineStore.prefixMatches(base, base.length, k)) {
+ break;
+ }
+ byte[] v = iterator.peekNext().getValue();
+ loadTokenData(state, v);
+ ++numTokens;
+ }
+ } finally {
+ IOUtils.cleanup(LOG, iterator);
+ }
+ return numTokens;
+ }
+
+ /**
+ * Creates a domain entity key with column name suffix, of the form
+ * TOKEN_ENTRY_PREFIX + sequence number.
+ */
+ private static byte[] createTokenEntryKey(int seqNum) throws IOException {
+ return KeyBuilder.newInstance().add(TOKEN_ENTRY_PREFIX)
+ .add(Integer.toString(seqNum)).getBytes();
+ }
+
+ /**
+ * Creates a domain entity key with column name suffix, of the form
+ * TOKEN_MASTER_KEY_ENTRY_PREFIX + sequence number.
+ */
+ private static byte[] createTokenMasterKeyEntryKey(int keyId) throws IOException {
+ return KeyBuilder.newInstance().add(TOKEN_MASTER_KEY_ENTRY_PREFIX)
+ .add(Integer.toString(keyId)).getBytes();
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/recovery/MemoryTimelineServerStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/recovery/MemoryTimelineServerStateStoreService.java
new file mode 100644
index 0000000..b052c88
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/recovery/MemoryTimelineServerStateStoreService.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timeline.recovery;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
+
+/**
+ * A state store backed by memory for unit tests
+ */
+class MemroyTimelineServiceStateStoreService
+ extends TimelineServiceStateStoreService {
+
+ private TimelineServiceState state;
+
+ @Override
+ protected void initStorage(Configuration conf) throws IOException {
+ }
+
+ @Override
+ protected void startStorage() throws IOException {
+ state = new TimelineServiceState();
+ }
+
+ @Override
+ protected void closeStorage() throws IOException {
+ state = null;
+ }
+
+ @Override
+ public TimelineServiceState loadState() throws IOException {
+ TimelineServiceState result = new TimelineServiceState();
+ result.tokenState.putAll(state.tokenState);
+ result.tokenMasterKeyState.addAll(state.tokenMasterKeyState);
+ return result;
+ }
+
+ @Override
+ public void storeToken(TimelineDelegationTokenIdentifier tokenId,
+ Long renewDate)
+ throws IOException {
+ if (state.tokenState.containsKey(tokenId)) {
+ throw new IOException("token " + tokenId + " was stored twice");
+ }
+ state.tokenState.put(tokenId, renewDate);
+ }
+
+ @Override
+ public void updateToken(TimelineDelegationTokenIdentifier tokenId,
+ Long renewDate)
+ throws IOException {
+ if (!state.tokenState.containsKey(tokenId)) {
+ throw new IOException("token " + tokenId + " not in store");
+ }
+ state.tokenState.put(tokenId, renewDate);
+ }
+
+ @Override
+ public void removeToken(TimelineDelegationTokenIdentifier tokenId)
+ throws IOException {
+ state.tokenState.remove(tokenId);
+ }
+
+ @Override
+ public void storeTokenMasterKey(DelegationKey key) throws IOException {
+ if (state.tokenMasterKeyState.contains(key)) {
+ throw new IOException("token master key " + key + " was stored twice");
+ }
+ state.tokenMasterKeyState.add(key);
+ }
+
+ @Override
+ public void removeTokenMasterKey(DelegationKey key) throws IOException {
+ state.tokenMasterKeyState.remove(key);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/recovery/TimelineServiceStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/recovery/TimelineServiceStateStoreService.java
new file mode 100644
index 0000000..9bc9df8
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/recovery/TimelineServiceStateStoreService.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline.recovery;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
+
+@Private
+@Unstable
+/**
+ * Base class for timeline service state storage.
+ * Storage implementations need to implement blocking store and load methods
+ * to actually store and load the state.
+ */
+public abstract class TimelineServiceStateStoreService extends AbstractService {
+
+ public static class TimelineServiceState {
+ Map tokenState =
+ new HashMap();
+ Set tokenMasterKeyState = new HashSet();
+
+ public Map getTokenState() {
+ return tokenState;
+ }
+
+ public Set getTokenMasterKeyState() {
+ return tokenMasterKeyState;
+ }
+ }
+
+ public TimelineServiceStateStoreService() {
+ super(TimelineServiceStateStoreService.class.getName());
+ }
+
+ public TimelineServiceStateStoreService(String name) {
+ super(name);
+ }
+
+ /**
+ * Initialize the state storage
+ *
+ * @param conf the configuration
+ * @throws IOException
+ */
+ @Override
+ public void serviceInit(Configuration conf) throws IOException {
+ initStorage(conf);
+ }
+
+ /**
+ * Start the state storage for use
+ *
+ * @throws IOException
+ */
+ @Override
+ public void serviceStart() throws IOException {
+ startStorage();
+ }
+
+ /**
+ * Shutdown the state storage.
+ *
+ * @throws IOException
+ */
+ @Override
+ public void serviceStop() throws IOException {
+ closeStorage();
+ }
+
+ /**
+ * Implementation-specific initialization.
+ *
+ * @param conf the configuration
+ * @throws IOException
+ */
+ protected abstract void initStorage(Configuration conf) throws IOException;
+
+ /**
+ * Implementation-specific startup.
+ *
+ * @throws IOException
+ */
+ protected abstract void startStorage() throws IOException;
+
+ /**
+ * Implementation-specific shutdown.
+ *
+ * @throws IOException
+ */
+ protected abstract void closeStorage() throws IOException;
+
+ /**
+ * Load the timeline service state from the state storage.
+ *
+ * @throws IOException
+ */
+ public abstract TimelineServiceState loadState() throws IOException;
+
+ /**
+ * Blocking method to store a delegation token along with the current token
+ * sequence number to the state storage.
+ *
+ * Implementations must not return from this method until the token has been
+ * committed to the state store.
+ *
+ * @param tokenId the token to store
+ * @param renewDate the token renewal deadline
+ * @throws IOException
+ */
+ public abstract void storeToken(TimelineDelegationTokenIdentifier tokenId,
+ Long renewDate) throws IOException;
+
+ /**
+ * Blocking method to update the expiration of a delegation token
+ * in the state storage.
+ *
+ * Implementations must not return from this method until the expiration
+ * date of the token has been updated in the state store.
+ *
+ * @param tokenId the token to update
+ * @param renewDate the new token renewal deadline
+ * @throws IOException
+ */
+ public abstract void updateToken(TimelineDelegationTokenIdentifier tokenId,
+ Long renewDate) throws IOException;
+
+ /**
+ * Blocking method to remove a delegation token from the state storage.
+ *
+ * Implementations must not return from this method until the token has been
+ * removed from the state store.
+ *
+ * @param tokenId the token to remove
+ * @throws IOException
+ */
+ public abstract void removeToken(TimelineDelegationTokenIdentifier tokenId)
+ throws IOException;
+
+ /**
+ * Blocking method to store a delegation token master key.
+ *
+ * Implementations must not return from this method until the key has been
+ * committed to the state store.
+ *
+ * @param key the master key to store
+ * @throws IOException
+ */
+ public abstract void storeTokenMasterKey(
+ DelegationKey key) throws IOException;
+
+ /**
+ * Blocking method to remove a delegation token master key.
+ *
+ * Implementations must not return from this method until the key has been
+ * removed from the state store.
+ *
+ * @param key the master key to remove
+ * @throws IOException
+ */
+ public abstract void removeTokenMasterKey(DelegationKey key)
+ throws IOException;
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineDelegationTokenSecretManagerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineDelegationTokenSecretManagerService.java
index 11a64e6..bd54d66 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineDelegationTokenSecretManagerService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineDelegationTokenSecretManagerService.java
@@ -18,33 +18,34 @@
package org.apache.hadoop.yarn.server.timeline.security;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
import java.io.IOException;
-import java.net.InetSocketAddress;
+import java.util.Map.Entry;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.apache.hadoop.yarn.server.timeline.recovery.LeveldbTimelineServiceStateStoreService;
+import org.apache.hadoop.yarn.server.timeline.recovery.TimelineServiceStateStoreService;
+import org.apache.hadoop.yarn.server.timeline.recovery.TimelineServiceStateStoreService.TimelineServiceState;
/**
* The service wrapper of {@link TimelineDelegationTokenSecretManager}
*/
@Private
@Unstable
-public class TimelineDelegationTokenSecretManagerService extends AbstractService {
+public class TimelineDelegationTokenSecretManagerService extends
+ AbstractService {
private TimelineDelegationTokenSecretManager secretManager = null;
- private InetSocketAddress serviceAddr = null;
+ private TimelineServiceStateStoreService stateStore = null;
public TimelineDelegationTokenSecretManagerService() {
super(TimelineDelegationTokenSecretManagerService.class.getName());
@@ -52,6 +53,12 @@ public TimelineDelegationTokenSecretManagerService() {
@Override
protected void serviceInit(Configuration conf) throws Exception {
+ if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_RECOVERY_ENABLED,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_RECOVERY_ENABLED)) {
+ stateStore = createStateStore(conf);
+ stateStore.init(conf);
+ }
+
long secretKeyInterval =
conf.getLong(YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_KEY,
YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
@@ -62,25 +69,47 @@ protected void serviceInit(Configuration conf) throws Exception {
conf.getLong(YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
secretManager = new TimelineDelegationTokenSecretManager(secretKeyInterval,
- tokenMaxLifetime, tokenRenewInterval,
- 3600000);
- secretManager.startThreads();
-
- serviceAddr = TimelineUtils.getTimelineTokenServiceAddress(getConfig());
+ tokenMaxLifetime, tokenRenewInterval, 3600000, stateStore);
super.init(conf);
}
@Override
+ protected void serviceStart() throws Exception {
+ if (stateStore != null) {
+ stateStore.start();
+ TimelineServiceState state = stateStore.loadState();
+ secretManager.recover(state);
+ }
+
+ secretManager.startThreads();
+ super.serviceStart();
+ }
+
+ @Override
protected void serviceStop() throws Exception {
+ if (stateStore != null) {
+ stateStore.stop();
+ }
+
secretManager.stopThreads();
super.stop();
}
+ protected TimelineServiceStateStoreService createStateStore(
+ Configuration conf) {
+ return ReflectionUtils.newInstance(
+ conf.getClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
+ LeveldbTimelineServiceStateStoreService.class,
+ TimelineServiceStateStoreService.class), conf);
+ }
+
/**
* Ge the instance of {link #TimelineDelegationTokenSecretManager}
+ *
* @return the instance of {link #TimelineDelegationTokenSecretManager}
*/
- public TimelineDelegationTokenSecretManager getTimelineDelegationTokenSecretManager() {
+ public TimelineDelegationTokenSecretManager
+ getTimelineDelegationTokenSecretManager() {
return secretManager;
}
@@ -101,11 +130,20 @@ public TimelineDelegationTokenSecretManager getTimelineDelegationTokenSecretMana
public static class TimelineDelegationTokenSecretManager extends
AbstractDelegationTokenSecretManager {
- public TimelineDelegationTokenSecretManager(long delegationKeyUpdateInterval,
- long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
- long delegationTokenRemoverScanInterval) {
+ public static final Log LOG =
+ LogFactory.getLog(TimelineDelegationTokenSecretManager.class);
+
+ private TimelineServiceStateStoreService stateStore;
+
+ public TimelineDelegationTokenSecretManager(
+ long delegationKeyUpdateInterval,
+ long delegationTokenMaxLifetime,
+ long delegationTokenRenewInterval,
+ long delegationTokenRemoverScanInterval,
+ TimelineServiceStateStoreService stateStore) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
+ this.stateStore = stateStore;
}
@Override
@@ -113,6 +151,89 @@ public TimelineDelegationTokenIdentifier createIdentifier() {
return new TimelineDelegationTokenIdentifier();
}
+ @Override
+ protected void storeNewMasterKey(DelegationKey key) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing master key " + key.getKeyId());
+ }
+ try {
+ if (stateStore != null) {
+ stateStore.storeTokenMasterKey(key);
+ }
+ } catch (IOException e) {
+ LOG.error("Unable to store master key " + key.getKeyId(), e);
+ }
+ }
+
+ @Override
+ protected void removeStoredMasterKey(DelegationKey key) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing master key " + key.getKeyId());
+ }
+ try {
+ if (stateStore != null) {
+ stateStore.removeTokenMasterKey(key);
+ }
+ } catch (IOException e) {
+ LOG.error("Unable to remove master key " + key.getKeyId(), e);
+ }
+ }
+
+ @Override
+ protected void storeNewToken(TimelineDelegationTokenIdentifier tokenId,
+ long renewDate) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing token " + tokenId.getSequenceNumber());
+ }
+ try {
+ if (stateStore != null) {
+ stateStore.storeToken(tokenId, renewDate);
+ }
+ } catch (IOException e) {
+ LOG.error("Unable to store token " + tokenId.getSequenceNumber(), e);
+ }
+ }
+
+ @Override
+ protected void removeStoredToken(TimelineDelegationTokenIdentifier tokenId)
+ throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing token " + tokenId.getSequenceNumber());
+ }
+ try {
+ if (stateStore != null) {
+ stateStore.removeToken(tokenId);
+ }
+ } catch (IOException e) {
+ LOG.error("Unable to remove token " + tokenId.getSequenceNumber(), e);
+ }
+ }
+
+ @Override
+ protected void updateStoredToken(TimelineDelegationTokenIdentifier tokenId,
+ long renewDate) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Updating token " + tokenId.getSequenceNumber());
+ }
+ try {
+ if (stateStore != null) {
+ stateStore.updateToken(tokenId, renewDate);
+ }
+ } catch (IOException e) {
+ LOG.error("Unable to update token " + tokenId.getSequenceNumber(), e);
+ }
+ }
+
+ public void recover(TimelineServiceState state) throws IOException {
+ LOG.info("Recovering " + getClass().getSimpleName());
+ for (DelegationKey key : state.getTokenMasterKeyState()) {
+ addKey(key);
+ }
+ for (Entry entry :
+ state.getTokenState().entrySet()) {
+ addPersistedDelegationToken(entry.getKey(), entry.getValue());
+ }
+ }
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/recovery/TestLeveldbTimelineServiceStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/recovery/TestLeveldbTimelineServiceStateStoreService.java
new file mode 100644
index 0000000..1d4612b
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/recovery/TestLeveldbTimelineServiceStateStoreService.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline.recovery;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.timeline.recovery.TimelineServiceStateStoreService.TimelineServiceState;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestLeveldbTimelineServiceStateStoreService {
+
+ private FileContext fsContext;
+ private File fsPath;
+ private Configuration conf;
+ private TimelineServiceStateStoreService store;
+
+ @Before
+ public void setup() throws Exception {
+ fsPath = new File("target", getClass().getSimpleName() +
+ "-tmpDir").getAbsoluteFile();
+ fsContext = FileContext.getLocalFSFileContext();
+ fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
+ conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_RECOVERY_ENABLED, true);
+ conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
+ LeveldbTimelineServiceStateStoreService.class,
+ TimelineServiceStateStoreService.class);
+ conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_STATE_STORE_PATH,
+ fsPath.getAbsolutePath());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (store != null) {
+ store.stop();
+ }
+ if (fsContext != null) {
+ fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
+ }
+ }
+
+ private void initAndStartTimelineServiceStateStoreService() {
+ store = new LeveldbTimelineServiceStateStoreService();
+ store.init(conf);
+ store.start();
+ }
+
+ @Test
+ public void testTokenStore() throws Exception {
+ initAndStartTimelineServiceStateStoreService();
+ TimelineServiceState state = store.loadState();
+ assertTrue("token state not empty", state.tokenState.isEmpty());
+ assertTrue("key state not empty", state.tokenMasterKeyState.isEmpty());
+
+ final DelegationKey key1 = new DelegationKey(1, 2, "keyData1".getBytes());
+ final TimelineDelegationTokenIdentifier token1 =
+ new TimelineDelegationTokenIdentifier(new Text("tokenOwner1"),
+ new Text("tokenRenewer1"), new Text("tokenUser1"));
+ token1.setSequenceNumber(1);
+ final Long tokenDate1 = 1L;
+ final TimelineDelegationTokenIdentifier token2 =
+ new TimelineDelegationTokenIdentifier(new Text("tokenOwner2"),
+ new Text("tokenRenewer2"), new Text("tokenUser2"));
+ token2.setSequenceNumber(12345678);
+ final Long tokenDate2 = 87654321L;
+
+ store.storeTokenMasterKey(key1);
+ try {
+ store.storeTokenMasterKey(key1);
+ fail("redundant store of key undetected");
+ } catch (IOException e) {
+ // expected
+ }
+ store.storeToken(token1, tokenDate1);
+ store.storeToken(token2, tokenDate2);
+ try {
+ store.storeToken(token1, tokenDate1);
+ fail("redundant store of token undetected");
+ } catch (IOException e) {
+ // expected
+ }
+ store.close();
+
+ initAndStartTimelineServiceStateStoreService();
+ state = store.loadState();
+ assertEquals("incorrect loaded token count", 2, state.tokenState.size());
+ assertTrue("missing token 1", state.tokenState.containsKey(token1));
+ assertEquals("incorrect token 1 date", tokenDate1,
+ state.tokenState.get(token1));
+ assertTrue("missing token 2", state.tokenState.containsKey(token2));
+ assertEquals("incorrect token 2 date", tokenDate2,
+ state.tokenState.get(token2));
+ assertEquals("incorrect master key count", 1,
+ state.tokenMasterKeyState.size());
+ assertTrue("missing master key 1",
+ state.tokenMasterKeyState.contains(key1));
+
+ final DelegationKey key2 = new DelegationKey(3, 4, "keyData2".getBytes());
+ final DelegationKey key3 = new DelegationKey(5, 6, "keyData3".getBytes());
+ final TimelineDelegationTokenIdentifier token3 =
+ new TimelineDelegationTokenIdentifier(new Text("tokenOwner3"),
+ new Text("tokenRenewer3"), new Text("tokenUser3"));
+ token3.setSequenceNumber(12345679);
+ final Long tokenDate3 = 87654321L;
+
+ store.removeToken(token1);
+ store.storeTokenMasterKey(key2);
+ final Long newTokenDate2 = 975318642L;
+ store.updateToken(token2, newTokenDate2);
+ store.removeTokenMasterKey(key1);
+ store.storeTokenMasterKey(key3);
+ store.storeToken(token3, tokenDate3);
+ store.close();
+
+ initAndStartTimelineServiceStateStoreService();
+ state = store.loadState();
+ assertEquals("incorrect loaded token count", 2, state.tokenState.size());
+ assertFalse("token 1 not removed", state.tokenState.containsKey(token1));
+ assertTrue("missing token 2", state.tokenState.containsKey(token2));
+ assertEquals("incorrect token 2 date", newTokenDate2,
+ state.tokenState.get(token2));
+ assertTrue("missing token 3", state.tokenState.containsKey(token3));
+ assertEquals("incorrect token 3 date", tokenDate3,
+ state.tokenState.get(token3));
+ assertEquals("incorrect master key count", 2,
+ state.tokenMasterKeyState.size());
+ assertFalse("master key 1 not removed",
+ state.tokenMasterKeyState.contains(key1));
+ assertTrue("missing master key 2",
+ state.tokenMasterKeyState.contains(key2));
+ assertTrue("missing master key 3",
+ state.tokenMasterKeyState.contains(key3));
+ store.close();
+ }
+
+}