diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClientAsync.java new file mode 100644 index 0000000..2266f6c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClientAsync.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api; + +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.client.api.impl.TimelineClientAsyncImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; + +/** + * A client library that can be used to post some information in terms of a + * number of conceptual entities. + */ +@Public +@Unstable +public abstract class TimelineClientAsync extends AbstractService { + private static final Log LOG = LogFactory.getLog(TimelineClientAsync.class); + protected TimelineClient client; + protected CallbackHandler callbackHandler; + + @Public + public static TimelineClientAsync createTimelineClient( + CallbackHandler callbackHandler) { + TimelineClientAsync client = new TimelineClientAsyncImpl(callbackHandler); + return client; + } + + @Private + protected TimelineClientAsync(String name, TimelineClient client, + CallbackHandler callbackHandler) { + super(name); + this.setClient(client); + this.setCallbackHandler(callbackHandler); + } + + public TimelineClient getClient() { + return client; + } + + public void setClient(TimelineClient client) { + this.client = client; + } + + public CallbackHandler getCallbackHandler() { + return callbackHandler; + } + + public void setCallbackHandler(CallbackHandler callbackHandler) { + this.callbackHandler = callbackHandler; + } + + /** + *

+ * Send the information of a number of conceptual entities to the timeline + * server. It is a non blocking API. + *

+ * + * @param entities + * the collection of {@link TimelineEntity} + * @return the error information if the sent entities are not correctly stored + * @throws InterruptedException + */ + @Public + public abstract void putEntities(TimelineEntity... entities) + throws InterruptedException; + + /** + *

+ * Get a delegation token so as to be able to talk to the timeline server in a + * secure way. It is blocking API. The method will not return until it gets + * the response from the timeline server. + *

+ * + * @param renewer + * Address of the renewer who can renew these tokens when needed by + * securely talking to the timeline server + * @return a delegation token ({@link Token}) that can be used to talk to the + * timeline server + * @throws IOException + * @throws YarnException + */ + @Public + public abstract Token + getDelegationToken(String renewer) throws IOException, YarnException; + + /** + * Wait for check to return true for each 1000 ms. + * See also {@link #waitFor(com.google.common.base.Supplier, int)} + * and {@link #waitFor(com.google.common.base.Supplier, int, int)} + * @param check + */ + public void waitFor(Supplier check) throws InterruptedException { + waitFor(check, 1000); + } + + /** + * Wait for check to return true for each + * checkEveryMillis ms. + * See also {@link #waitFor(com.google.common.base.Supplier, int, int)} + * @param check user defined checker + * @param checkEveryMillis interval to call check + */ + public void waitFor(Supplier check, int checkEveryMillis) + throws InterruptedException { + waitFor(check, checkEveryMillis, 1); + }; + + /** + * Wait for check to return true for each + * checkEveryMillis ms. In the main loop, this method will log + * the message "waiting in main loop" for each logInterval times + * iteration to confirm the thread is alive. + * @param check user defined checker + * @param checkEveryMillis interval to call check + * @param logInterval interval to log for each + */ + public void waitFor(Supplier check, int checkEveryMillis, + int logInterval) throws InterruptedException { + Preconditions.checkNotNull(check, "check should not be null"); + Preconditions.checkArgument(checkEveryMillis >= 0, + "checkEveryMillis should be positive value"); + Preconditions.checkArgument(logInterval >= 0, + "logInterval should be positive value"); + + int loggingCounter = logInterval; + do { + if (LOG.isDebugEnabled()) { + LOG.debug("Check the condition for main loop."); + } + + boolean result = check.get(); + if (result) { + LOG.info("Exits the main loop."); + return; + } + if (--loggingCounter <= 0) { + LOG.info("Waiting in main loop."); + loggingCounter = logInterval; + } + + Thread.sleep(checkEveryMillis); + } while (true); + } + + public interface CallbackHandler { + + /** + * Called when the Timeline server responds to a putEntities + * request. + */ + public void onEntitiesPut(TimelinePutResponse response); + + /** + * Called when error comes the timeline server communications. + * Calling stop() is the recommended action. + * + * @param e + */ + public void onError(Throwable e); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientAsyncImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientAsyncImpl.java new file mode 100644 index 0000000..58baf08 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientAsyncImpl.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.client.api.TimelineClientAsync; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; + +import com.google.common.annotations.VisibleForTesting; + +@Private +@Unstable +public class TimelineClientAsyncImpl extends TimelineClientAsync { + + private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class); + + private volatile boolean keepRunning = false; + private final BlockingQueue requestQueue; + private final BlockingQueue responseQueue; + private final RequestDispatcherThread dispatcherThread; + private final CallbackHandlerThread handlerThread; + private volatile Throwable savedException; + private final TimelineEntity[] ENTITY_EMPTY_ARRAY = new TimelineEntity[0]; + + public TimelineClientAsyncImpl(CallbackHandler callbackHandler) { + this(TimelineClientAsync.class.getName(), callbackHandler); + } + + public TimelineClientAsyncImpl(String name, CallbackHandler callbackHandler) { + this(name, TimelineClient.createTimelineClient(), callbackHandler); + } + + @Private + @VisibleForTesting + protected TimelineClientAsyncImpl(String name, TimelineClient client, + CallbackHandler callbackHandler) { + super(name, client, callbackHandler); + requestQueue = new LinkedBlockingQueue(); + responseQueue = new LinkedBlockingQueue(); + keepRunning = true; + dispatcherThread = new RequestDispatcherThread(); + handlerThread = new CallbackHandlerThread(); + } + + protected void serviceInit(Configuration conf) throws Exception { + client.init(conf); + dispatcherThread.setDaemon(false); + handlerThread.setDaemon(false); + super.serviceInit(conf); + } + + protected void serviceStart() throws Exception { + client.start(); + dispatcherThread.start(); + handlerThread.start(); + super.serviceStart(); + } + + protected void serviceStop() throws Exception { + keepRunning = false; + client.stop(); + dispatcherThread.interrupt(); + handlerThread.interrupt(); + super.serviceStart(); + } + + @Override + public void putEntities(TimelineEntity... entities) + throws InterruptedException { + for (TimelineEntity e : entities) { + requestQueue.put(e); + } + } + + @Override + public Token getDelegationToken( + String renewer) throws IOException, YarnException { + return client.getDelegationToken(renewer); + } + + private class RequestDispatcherThread extends Thread { + public RequestDispatcherThread() { + super("TimelineClientAsync request dispatcher thread"); + } + + public void run() { + Collection entities = new ArrayList(); + while (!this.currentThread().isInterrupted() && keepRunning) { + try { + entities.add(requestQueue.take()); + } catch (InterruptedException ex) { + LOG.info("Interrupted while waiting for queue", ex); + continue; + } + requestQueue.drainTo(entities); + + TimelinePutResponse response = null; + try { + response = + client.putEntities(entities.toArray(ENTITY_EMPTY_ARRAY)); + } catch (Throwable ex) { + LOG.error("Exception on putEntities", ex); + savedException = ex; + // interrupt handler thread in case it waiting on the queue + handlerThread.interrupt(); + return; + } + if (response != null) { + while (true) { + try { + responseQueue.put(response); + break; + } catch (InterruptedException ex) { + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupted while waiting to put on response queue", + ex); + } + } + } + } + + } + } + } + + private class CallbackHandlerThread extends Thread { + public CallbackHandlerThread() { + super("TimelineClientAsync callback handler thread"); + } + + public void run() { + while (!this.currentThread().isInterrupted() && keepRunning) { + try { + TimelinePutResponse response = null; + if(savedException != null) { + LOG.error("Stopping callback due to: ", savedException); + callbackHandler.onError(savedException); + return; + } + + try { + response = responseQueue.take(); + } catch (InterruptedException ex) { + LOG.info("Interrupted while waiting for queue", ex); + continue; + } + callbackHandler.onEntitiesPut(response); + + } catch (Throwable ex) { + callbackHandler.onError(ex); + // re-throw exception to end the thread + throw new YarnRuntimeException(ex); + } + } + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientAsync.java new file mode 100644 index 0000000..8e5bf3d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientAsync.java @@ -0,0 +1,289 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import com.google.common.base.Supplier; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.client.api.TimelineClientAsync; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.net.ConnectException; + +import org.junit.Assert; + +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.sun.jersey.api.client.ClientHandlerException; +import com.sun.jersey.api.client.ClientResponse; + +public class TestTimelineClientAsync { + private final Log LOG = LogFactory.getLog(TestTimelineClientAsync.class); + + private TimelineClientAsyncImpl asyncClient; + private TestCallbackHandler handler; + + @Before + public void setup() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + handler = new TestCallbackHandler(); + asyncClient = createTimelineClientAsync(conf, handler); + } + + @After + public void tearDown() { + if (asyncClient != null) { + asyncClient.stop(); + } + } + + @Test + public void testPostEntities() throws Exception { + mockClientResponse((TimelineClientImpl)asyncClient.getClient(), + ClientResponse.Status.OK, false, false); + asyncClient.putEntities(generateEntity()); + asyncClient.waitFor(new Supplier() { + @Override + public Boolean get() { + return handler.notifyEntitiesPut; + } + }); + Assert.assertEquals(1, handler.getResponses().size()); + Assert.assertEquals(0, handler.getResponses().get(0).getErrors().size()); + } + + @Test + public void testPostEntitiesWithError() throws Exception { + LOG.info("testPostEntitiesWithError"); + mockClientResponse((TimelineClientImpl) asyncClient.getClient(), + ClientResponse.Status.OK, true, false); + asyncClient.putEntities(generateEntity()); + asyncClient.waitFor(new Supplier() { + @Override + public Boolean get() { + return handler.notifyEntitiesPut; + } + }); + Assert.assertEquals(1, handler.getResponses().size()); + TimelinePutResponse response = handler.getResponses().get(0); + Assert.assertEquals(1, handler.getResponses().get(0).getErrors().size()); + Assert.assertEquals("test entity id", response.getErrors().get(0) + .getEntityId()); + Assert.assertEquals("test entity type", response.getErrors().get(0) + .getEntityType()); + Assert.assertEquals(TimelinePutResponse.TimelinePutError.IO_EXCEPTION, + response.getErrors().get(0).getErrorCode()); + } + + @Test + public void testPostEntitiesNoResponse() throws Exception { + LOG.info("testPostEntitiesNoResponse"); + mockClientResponse((TimelineClientImpl)asyncClient.getClient(), + ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false); + asyncClient.putEntities(generateEntity()); + asyncClient.waitFor(new Supplier() { + @Override + public Boolean get() { + return handler.notifyError; + } + }); + Assert.assertEquals(0, handler.getResponses().size()); + List errors = handler.getErrors(); + Assert.assertNotNull(errors); + Assert.assertEquals(1, errors.size()); + Assert.assertTrue(errors.get(0).getMessage() + .contains("Failed to get the response from the timeline server.")); + } + + @Test + public void testPostEntitiesConnectionRefused() throws Exception { + LOG.info("testPostEntitiesConnectionRefused"); + mockClientResponse((TimelineClientImpl)asyncClient.getClient(), + null, false, true); + asyncClient.putEntities(generateEntity()); + asyncClient.waitFor(new Supplier() { + @Override + public Boolean get() { + return handler.notifyError; + } + }); + Assert.assertEquals(0, handler.getResponses().size()); + List errors = handler.getErrors(); + Assert.assertNotNull(errors); + Assert.assertEquals(1, errors.size()); + Assert.assertTrue(errors.get(0) instanceof ClientHandlerException); + } + + @Test + public void testPostEntitiesTimelineServiceNotEnabled() throws Exception { + LOG.info("testPostEntitiesTimelineServiceNotEnabled"); + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false); + final TestCallbackHandler handler = new TestCallbackHandler(); + TimelineClientAsyncImpl client = createTimelineClientAsync(conf, handler); + mockClientResponse((TimelineClientImpl)client.getClient(), + ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false); + client.putEntities(generateEntity()); + client.waitFor(new Supplier() { + @Override + public Boolean get() { + return handler.notifyEntitiesPut; + } + }); + List responses = handler.getResponses(); + Assert.assertNotNull(responses); + Assert.assertEquals(1, responses.size()); + Assert.assertEquals(0, handler.getErrors().size()); + } + + @Test + public void testPostEntitiesTimelineServiceDefaultNotEnabled() + throws Exception { + LOG.info("testPostEntitiesTimelineServiceDefaultNotEnabled"); + YarnConfiguration conf = new YarnConfiguration(); + // Unset the timeline service's enabled properties. + // Make sure default value is pickup up + conf.unset(YarnConfiguration.TIMELINE_SERVICE_ENABLED); + final TestCallbackHandler handler = new TestCallbackHandler(); + TimelineClientAsync client = createTimelineClientAsync(conf, handler); + mockClientResponse((TimelineClientImpl)client.getClient(), + ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false); + + client.putEntities(generateEntity()); + client.waitFor(new Supplier() { + @Override + public Boolean get() { + return handler.notifyEntitiesPut; + } + }); + List responses = handler.getResponses(); + Assert.assertNotNull(responses); + Assert.assertEquals(1, responses.size()); + Assert.assertEquals(0, handler.getErrors().size()); + } + + private static ClientResponse mockClientResponse(TimelineClientImpl client, + ClientResponse.Status status, boolean hasError, boolean hasRuntimeError) { + ClientResponse response = mock(ClientResponse.class); + if (hasRuntimeError) { + doThrow(new ClientHandlerException(new ConnectException())).when(client) + .doPostingEntities(any(TimelineEntities.class)); + return response; + } + doReturn(response).when(client) + .doPostingEntities(any(TimelineEntities.class)); + when(response.getClientResponseStatus()).thenReturn(status); + TimelinePutResponse.TimelinePutError error = + new TimelinePutResponse.TimelinePutError(); + error.setEntityId("test entity id"); + error.setEntityType("test entity type"); + error.setErrorCode(TimelinePutResponse.TimelinePutError.IO_EXCEPTION); + TimelinePutResponse putResponse = new TimelinePutResponse(); + if (hasError) { + putResponse.addError(error); + } + when(response.getEntity(TimelinePutResponse.class)).thenReturn(putResponse); + return response; + } + + private static TimelineEntity generateEntity() { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityId("entity id"); + entity.setEntityType("entity type"); + entity.setStartTime(System.currentTimeMillis()); + for (int i = 0; i < 2; ++i) { + TimelineEvent event = new TimelineEvent(); + event.setTimestamp(System.currentTimeMillis()); + event.setEventType("test event type " + i); + event.addEventInfo("key1", "val1"); + event.addEventInfo("key2", "val2"); + entity.addEvent(event); + } + entity.addRelatedEntity("test ref type 1", "test ref id 1"); + entity.addRelatedEntity("test ref type 2", "test ref id 2"); + entity.addPrimaryFilter("pkey1", "pval1"); + entity.addPrimaryFilter("pkey2", "pval2"); + entity.addOtherInfo("okey1", "oval1"); + entity.addOtherInfo("okey2", "oval2"); + return entity; + } + + private static TimelineClientAsyncImpl createTimelineClientAsync( + YarnConfiguration conf, TimelineClientAsync.CallbackHandler handler) { + TimelineClientAsyncImpl client = + (TimelineClientAsyncImpl) + TimelineClientAsync.createTimelineClient(handler); + client.setClient(spy(TimelineClient.createTimelineClient())); + client.init(conf); + client.start(); + return client; + } + + private static class TestCallbackHandler implements + TimelineClientAsyncImpl.CallbackHandler { + List responses; + List errors; + volatile boolean notifyError = false; + volatile boolean notifyEntitiesPut = false; + + public TestCallbackHandler() { + responses = Collections.synchronizedList( + new ArrayList()); + errors = Collections.synchronizedList(new ArrayList()); + } + + @Override + public void onEntitiesPut(TimelinePutResponse response) { + responses.add(response); + notifyEntitiesPut = true; + } + + @Override + public void onError(Throwable e) { + errors.add(e); + notifyError = true; + } + + public List getResponses() { + return responses; + } + + public List getErrors() { + return errors; + } + } + +}