diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClientAsync.java
new file mode 100644
index 0000000..2266f6c
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClientAsync.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.client.api.impl.TimelineClientAsyncImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
+
+/**
+ * A client library that can be used to post some information in terms of a
+ * number of conceptual entities.
+ */
+@Public
+@Unstable
+public abstract class TimelineClientAsync extends AbstractService {
+ private static final Log LOG = LogFactory.getLog(TimelineClientAsync.class);
+ protected TimelineClient client;
+ protected CallbackHandler callbackHandler;
+
+ @Public
+ public static TimelineClientAsync createTimelineClient(
+ CallbackHandler callbackHandler) {
+ TimelineClientAsync client = new TimelineClientAsyncImpl(callbackHandler);
+ return client;
+ }
+
+ @Private
+ protected TimelineClientAsync(String name, TimelineClient client,
+ CallbackHandler callbackHandler) {
+ super(name);
+ this.setClient(client);
+ this.setCallbackHandler(callbackHandler);
+ }
+
+ public TimelineClient getClient() {
+ return client;
+ }
+
+ public void setClient(TimelineClient client) {
+ this.client = client;
+ }
+
+ public CallbackHandler getCallbackHandler() {
+ return callbackHandler;
+ }
+
+ public void setCallbackHandler(CallbackHandler callbackHandler) {
+ this.callbackHandler = callbackHandler;
+ }
+
+ /**
+ *
+ * Send the information of a number of conceptual entities to the timeline
+ * server. It is a non blocking API.
+ *
+ *
+ * @param entities
+ * the collection of {@link TimelineEntity}
+ * @return the error information if the sent entities are not correctly stored
+ * @throws InterruptedException
+ */
+ @Public
+ public abstract void putEntities(TimelineEntity... entities)
+ throws InterruptedException;
+
+ /**
+ *
+ * Get a delegation token so as to be able to talk to the timeline server in a
+ * secure way. It is blocking API. The method will not return until it gets
+ * the response from the timeline server.
+ *
+ *
+ * @param renewer
+ * Address of the renewer who can renew these tokens when needed by
+ * securely talking to the timeline server
+ * @return a delegation token ({@link Token}) that can be used to talk to the
+ * timeline server
+ * @throws IOException
+ * @throws YarnException
+ */
+ @Public
+ public abstract Token
+ getDelegationToken(String renewer) throws IOException, YarnException;
+
+ /**
+ * Wait for check to return true for each 1000 ms.
+ * See also {@link #waitFor(com.google.common.base.Supplier, int)}
+ * and {@link #waitFor(com.google.common.base.Supplier, int, int)}
+ * @param check
+ */
+ public void waitFor(Supplier check) throws InterruptedException {
+ waitFor(check, 1000);
+ }
+
+ /**
+ * Wait for check to return true for each
+ * checkEveryMillis ms.
+ * See also {@link #waitFor(com.google.common.base.Supplier, int, int)}
+ * @param check user defined checker
+ * @param checkEveryMillis interval to call check
+ */
+ public void waitFor(Supplier check, int checkEveryMillis)
+ throws InterruptedException {
+ waitFor(check, checkEveryMillis, 1);
+ };
+
+ /**
+ * Wait for check to return true for each
+ * checkEveryMillis ms. In the main loop, this method will log
+ * the message "waiting in main loop" for each logInterval times
+ * iteration to confirm the thread is alive.
+ * @param check user defined checker
+ * @param checkEveryMillis interval to call check
+ * @param logInterval interval to log for each
+ */
+ public void waitFor(Supplier check, int checkEveryMillis,
+ int logInterval) throws InterruptedException {
+ Preconditions.checkNotNull(check, "check should not be null");
+ Preconditions.checkArgument(checkEveryMillis >= 0,
+ "checkEveryMillis should be positive value");
+ Preconditions.checkArgument(logInterval >= 0,
+ "logInterval should be positive value");
+
+ int loggingCounter = logInterval;
+ do {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Check the condition for main loop.");
+ }
+
+ boolean result = check.get();
+ if (result) {
+ LOG.info("Exits the main loop.");
+ return;
+ }
+ if (--loggingCounter <= 0) {
+ LOG.info("Waiting in main loop.");
+ loggingCounter = logInterval;
+ }
+
+ Thread.sleep(checkEveryMillis);
+ } while (true);
+ }
+
+ public interface CallbackHandler {
+
+ /**
+ * Called when the Timeline server responds to a putEntities
+ * request.
+ */
+ public void onEntitiesPut(TimelinePutResponse response);
+
+ /**
+ * Called when error comes the timeline server communications.
+ * Calling stop() is the recommended action.
+ *
+ * @param e
+ */
+ public void onError(Throwable e);
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientAsyncImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientAsyncImpl.java
new file mode 100644
index 0000000..58baf08
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientAsyncImpl.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineClientAsync;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
+
+import com.google.common.annotations.VisibleForTesting;
+
+@Private
+@Unstable
+public class TimelineClientAsyncImpl extends TimelineClientAsync {
+
+ private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
+
+ private volatile boolean keepRunning = false;
+ private final BlockingQueue requestQueue;
+ private final BlockingQueue responseQueue;
+ private final RequestDispatcherThread dispatcherThread;
+ private final CallbackHandlerThread handlerThread;
+ private volatile Throwable savedException;
+ private final TimelineEntity[] ENTITY_EMPTY_ARRAY = new TimelineEntity[0];
+
+ public TimelineClientAsyncImpl(CallbackHandler callbackHandler) {
+ this(TimelineClientAsync.class.getName(), callbackHandler);
+ }
+
+ public TimelineClientAsyncImpl(String name, CallbackHandler callbackHandler) {
+ this(name, TimelineClient.createTimelineClient(), callbackHandler);
+ }
+
+ @Private
+ @VisibleForTesting
+ protected TimelineClientAsyncImpl(String name, TimelineClient client,
+ CallbackHandler callbackHandler) {
+ super(name, client, callbackHandler);
+ requestQueue = new LinkedBlockingQueue();
+ responseQueue = new LinkedBlockingQueue();
+ keepRunning = true;
+ dispatcherThread = new RequestDispatcherThread();
+ handlerThread = new CallbackHandlerThread();
+ }
+
+ protected void serviceInit(Configuration conf) throws Exception {
+ client.init(conf);
+ dispatcherThread.setDaemon(false);
+ handlerThread.setDaemon(false);
+ super.serviceInit(conf);
+ }
+
+ protected void serviceStart() throws Exception {
+ client.start();
+ dispatcherThread.start();
+ handlerThread.start();
+ super.serviceStart();
+ }
+
+ protected void serviceStop() throws Exception {
+ keepRunning = false;
+ client.stop();
+ dispatcherThread.interrupt();
+ handlerThread.interrupt();
+ super.serviceStart();
+ }
+
+ @Override
+ public void putEntities(TimelineEntity... entities)
+ throws InterruptedException {
+ for (TimelineEntity e : entities) {
+ requestQueue.put(e);
+ }
+ }
+
+ @Override
+ public Token getDelegationToken(
+ String renewer) throws IOException, YarnException {
+ return client.getDelegationToken(renewer);
+ }
+
+ private class RequestDispatcherThread extends Thread {
+ public RequestDispatcherThread() {
+ super("TimelineClientAsync request dispatcher thread");
+ }
+
+ public void run() {
+ Collection entities = new ArrayList();
+ while (!this.currentThread().isInterrupted() && keepRunning) {
+ try {
+ entities.add(requestQueue.take());
+ } catch (InterruptedException ex) {
+ LOG.info("Interrupted while waiting for queue", ex);
+ continue;
+ }
+ requestQueue.drainTo(entities);
+
+ TimelinePutResponse response = null;
+ try {
+ response =
+ client.putEntities(entities.toArray(ENTITY_EMPTY_ARRAY));
+ } catch (Throwable ex) {
+ LOG.error("Exception on putEntities", ex);
+ savedException = ex;
+ // interrupt handler thread in case it waiting on the queue
+ handlerThread.interrupt();
+ return;
+ }
+ if (response != null) {
+ while (true) {
+ try {
+ responseQueue.put(response);
+ break;
+ } catch (InterruptedException ex) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Interrupted while waiting to put on response queue",
+ ex);
+ }
+ }
+ }
+ }
+
+ }
+ }
+ }
+
+ private class CallbackHandlerThread extends Thread {
+ public CallbackHandlerThread() {
+ super("TimelineClientAsync callback handler thread");
+ }
+
+ public void run() {
+ while (!this.currentThread().isInterrupted() && keepRunning) {
+ try {
+ TimelinePutResponse response = null;
+ if(savedException != null) {
+ LOG.error("Stopping callback due to: ", savedException);
+ callbackHandler.onError(savedException);
+ return;
+ }
+
+ try {
+ response = responseQueue.take();
+ } catch (InterruptedException ex) {
+ LOG.info("Interrupted while waiting for queue", ex);
+ continue;
+ }
+ callbackHandler.onEntitiesPut(response);
+
+ } catch (Throwable ex) {
+ callbackHandler.onError(ex);
+ // re-throw exception to end the thread
+ throw new YarnRuntimeException(ex);
+ }
+ }
+ }
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientAsync.java
new file mode 100644
index 0000000..8e5bf3d
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientAsync.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import com.google.common.base.Supplier;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineClientAsync;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.net.ConnectException;
+
+import org.junit.Assert;
+
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+
+public class TestTimelineClientAsync {
+ private final Log LOG = LogFactory.getLog(TestTimelineClientAsync.class);
+
+ private TimelineClientAsyncImpl asyncClient;
+ private TestCallbackHandler handler;
+
+ @Before
+ public void setup() {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ handler = new TestCallbackHandler();
+ asyncClient = createTimelineClientAsync(conf, handler);
+ }
+
+ @After
+ public void tearDown() {
+ if (asyncClient != null) {
+ asyncClient.stop();
+ }
+ }
+
+ @Test
+ public void testPostEntities() throws Exception {
+ mockClientResponse((TimelineClientImpl)asyncClient.getClient(),
+ ClientResponse.Status.OK, false, false);
+ asyncClient.putEntities(generateEntity());
+ asyncClient.waitFor(new Supplier() {
+ @Override
+ public Boolean get() {
+ return handler.notifyEntitiesPut;
+ }
+ });
+ Assert.assertEquals(1, handler.getResponses().size());
+ Assert.assertEquals(0, handler.getResponses().get(0).getErrors().size());
+ }
+
+ @Test
+ public void testPostEntitiesWithError() throws Exception {
+ LOG.info("testPostEntitiesWithError");
+ mockClientResponse((TimelineClientImpl) asyncClient.getClient(),
+ ClientResponse.Status.OK, true, false);
+ asyncClient.putEntities(generateEntity());
+ asyncClient.waitFor(new Supplier() {
+ @Override
+ public Boolean get() {
+ return handler.notifyEntitiesPut;
+ }
+ });
+ Assert.assertEquals(1, handler.getResponses().size());
+ TimelinePutResponse response = handler.getResponses().get(0);
+ Assert.assertEquals(1, handler.getResponses().get(0).getErrors().size());
+ Assert.assertEquals("test entity id", response.getErrors().get(0)
+ .getEntityId());
+ Assert.assertEquals("test entity type", response.getErrors().get(0)
+ .getEntityType());
+ Assert.assertEquals(TimelinePutResponse.TimelinePutError.IO_EXCEPTION,
+ response.getErrors().get(0).getErrorCode());
+ }
+
+ @Test
+ public void testPostEntitiesNoResponse() throws Exception {
+ LOG.info("testPostEntitiesNoResponse");
+ mockClientResponse((TimelineClientImpl)asyncClient.getClient(),
+ ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false);
+ asyncClient.putEntities(generateEntity());
+ asyncClient.waitFor(new Supplier() {
+ @Override
+ public Boolean get() {
+ return handler.notifyError;
+ }
+ });
+ Assert.assertEquals(0, handler.getResponses().size());
+ List errors = handler.getErrors();
+ Assert.assertNotNull(errors);
+ Assert.assertEquals(1, errors.size());
+ Assert.assertTrue(errors.get(0).getMessage()
+ .contains("Failed to get the response from the timeline server."));
+ }
+
+ @Test
+ public void testPostEntitiesConnectionRefused() throws Exception {
+ LOG.info("testPostEntitiesConnectionRefused");
+ mockClientResponse((TimelineClientImpl)asyncClient.getClient(),
+ null, false, true);
+ asyncClient.putEntities(generateEntity());
+ asyncClient.waitFor(new Supplier() {
+ @Override
+ public Boolean get() {
+ return handler.notifyError;
+ }
+ });
+ Assert.assertEquals(0, handler.getResponses().size());
+ List errors = handler.getErrors();
+ Assert.assertNotNull(errors);
+ Assert.assertEquals(1, errors.size());
+ Assert.assertTrue(errors.get(0) instanceof ClientHandlerException);
+ }
+
+ @Test
+ public void testPostEntitiesTimelineServiceNotEnabled() throws Exception {
+ LOG.info("testPostEntitiesTimelineServiceNotEnabled");
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false);
+ final TestCallbackHandler handler = new TestCallbackHandler();
+ TimelineClientAsyncImpl client = createTimelineClientAsync(conf, handler);
+ mockClientResponse((TimelineClientImpl)client.getClient(),
+ ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false);
+ client.putEntities(generateEntity());
+ client.waitFor(new Supplier() {
+ @Override
+ public Boolean get() {
+ return handler.notifyEntitiesPut;
+ }
+ });
+ List responses = handler.getResponses();
+ Assert.assertNotNull(responses);
+ Assert.assertEquals(1, responses.size());
+ Assert.assertEquals(0, handler.getErrors().size());
+ }
+
+ @Test
+ public void testPostEntitiesTimelineServiceDefaultNotEnabled()
+ throws Exception {
+ LOG.info("testPostEntitiesTimelineServiceDefaultNotEnabled");
+ YarnConfiguration conf = new YarnConfiguration();
+ // Unset the timeline service's enabled properties.
+ // Make sure default value is pickup up
+ conf.unset(YarnConfiguration.TIMELINE_SERVICE_ENABLED);
+ final TestCallbackHandler handler = new TestCallbackHandler();
+ TimelineClientAsync client = createTimelineClientAsync(conf, handler);
+ mockClientResponse((TimelineClientImpl)client.getClient(),
+ ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false);
+
+ client.putEntities(generateEntity());
+ client.waitFor(new Supplier() {
+ @Override
+ public Boolean get() {
+ return handler.notifyEntitiesPut;
+ }
+ });
+ List responses = handler.getResponses();
+ Assert.assertNotNull(responses);
+ Assert.assertEquals(1, responses.size());
+ Assert.assertEquals(0, handler.getErrors().size());
+ }
+
+ private static ClientResponse mockClientResponse(TimelineClientImpl client,
+ ClientResponse.Status status, boolean hasError, boolean hasRuntimeError) {
+ ClientResponse response = mock(ClientResponse.class);
+ if (hasRuntimeError) {
+ doThrow(new ClientHandlerException(new ConnectException())).when(client)
+ .doPostingEntities(any(TimelineEntities.class));
+ return response;
+ }
+ doReturn(response).when(client)
+ .doPostingEntities(any(TimelineEntities.class));
+ when(response.getClientResponseStatus()).thenReturn(status);
+ TimelinePutResponse.TimelinePutError error =
+ new TimelinePutResponse.TimelinePutError();
+ error.setEntityId("test entity id");
+ error.setEntityType("test entity type");
+ error.setErrorCode(TimelinePutResponse.TimelinePutError.IO_EXCEPTION);
+ TimelinePutResponse putResponse = new TimelinePutResponse();
+ if (hasError) {
+ putResponse.addError(error);
+ }
+ when(response.getEntity(TimelinePutResponse.class)).thenReturn(putResponse);
+ return response;
+ }
+
+ private static TimelineEntity generateEntity() {
+ TimelineEntity entity = new TimelineEntity();
+ entity.setEntityId("entity id");
+ entity.setEntityType("entity type");
+ entity.setStartTime(System.currentTimeMillis());
+ for (int i = 0; i < 2; ++i) {
+ TimelineEvent event = new TimelineEvent();
+ event.setTimestamp(System.currentTimeMillis());
+ event.setEventType("test event type " + i);
+ event.addEventInfo("key1", "val1");
+ event.addEventInfo("key2", "val2");
+ entity.addEvent(event);
+ }
+ entity.addRelatedEntity("test ref type 1", "test ref id 1");
+ entity.addRelatedEntity("test ref type 2", "test ref id 2");
+ entity.addPrimaryFilter("pkey1", "pval1");
+ entity.addPrimaryFilter("pkey2", "pval2");
+ entity.addOtherInfo("okey1", "oval1");
+ entity.addOtherInfo("okey2", "oval2");
+ return entity;
+ }
+
+ private static TimelineClientAsyncImpl createTimelineClientAsync(
+ YarnConfiguration conf, TimelineClientAsync.CallbackHandler handler) {
+ TimelineClientAsyncImpl client =
+ (TimelineClientAsyncImpl)
+ TimelineClientAsync.createTimelineClient(handler);
+ client.setClient(spy(TimelineClient.createTimelineClient()));
+ client.init(conf);
+ client.start();
+ return client;
+ }
+
+ private static class TestCallbackHandler implements
+ TimelineClientAsyncImpl.CallbackHandler {
+ List responses;
+ List errors;
+ volatile boolean notifyError = false;
+ volatile boolean notifyEntitiesPut = false;
+
+ public TestCallbackHandler() {
+ responses = Collections.synchronizedList(
+ new ArrayList());
+ errors = Collections.synchronizedList(new ArrayList());
+ }
+
+ @Override
+ public void onEntitiesPut(TimelinePutResponse response) {
+ responses.add(response);
+ notifyEntitiesPut = true;
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ errors.add(e);
+ notifyError = true;
+ }
+
+ public List getResponses() {
+ return responses;
+ }
+
+ public List getErrors() {
+ return errors;
+ }
+ }
+
+}