diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/CuratorJobQueueStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/CuratorJobQueueStorage.java new file mode 100644 index 00000000000..c0b70b167c6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/CuratorJobQueueStorage.java @@ -0,0 +1,226 @@ +package org.apache.hadoop.applications.mawo.server.master; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.transaction.CuratorTransaction; +import org.apache.curator.framework.api.transaction.CuratorTransactionFinal; +import org.apache.curator.retry.RetryNTimes; +import org.apache.hadoop.applications.mawo.server.common.MawoConfiguration; +import org.apache.hadoop.applications.mawo.server.common.NullTask; +import org.apache.hadoop.applications.mawo.server.common.Task; +import org.apache.hadoop.applications.mawo.server.common.TaskId; +import org.apache.hadoop.util.ZKUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.common.IOUtils; +import org.apache.zookeeper.data.ACL; + +import com.google.common.base.Preconditions; + +public class CuratorJobQueueStorage extends JobQueueStorage { + private CuratorFramework curator; + private String zkWorkingPath; + private List zkAcl; + private String fencingNodePath; + + private static final String FENCING_LOCK = "ZK_FENCING_LOCK"; + + public CuratorJobQueueStorage(String appName) { + super(appName); + } + + @Override + public void jobQueueStorageInit(MawoConfiguration conf) throws IOException { + zkWorkingPath = conf.getZKParentPath(); + String zkHostPort = conf.getZKAddress(); + int zkSessionTimeout = conf.getZKSessionTimeoutMS(); + int maxRetryNum = conf.getZKRetriesNum(); + int retryInterval = conf.getZKRetryIntervalMS(); + String zkAclConf = conf.getZKAcl(); + zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf); + zkAcl = ZKUtil.parseACLs(zkAclConf); + fencingNodePath = getNodePath(zkWorkingPath, FENCING_LOCK); + curator = CuratorFrameworkFactory.builder().connectString(zkHostPort) + .sessionTimeoutMs(zkSessionTimeout) + .retryPolicy(new RetryNTimes(maxRetryNum, retryInterval)).build(); + curator.start(); + } + + @Override + public void jobQueueStorageStart() throws Exception { + System.out.println("Curator Job server start function"); + createRootDirRecursively(zkWorkingPath); + curator.setACL().withACL(zkAcl).forPath(zkWorkingPath); + delete(fencingNodePath); + System.out.println("Curator Job server start function ends"); + } + + public void stop() { + IOUtils.closeStream(curator); + } + + @Override + public void jobQueueStorageStoreTask(Task task) throws Exception { + String appPath = getNodePath(zkWorkingPath, appName); + create(appPath); + String taskCreatePath = getNodePath(appPath, task.getTaskId().toString()); + byte[] taskData = serialize(task); + safeCreate(taskCreatePath, taskData, zkAcl, CreateMode.PERSISTENT); + } + + @Override + public List jobQueueStorageLoadTasks() throws Exception { + List tasks = new ArrayList(); + String appPath = getNodePath(zkWorkingPath, appName); + if (!exists(appPath)) { + return tasks; + } + List childNodes = curator.getChildren().forPath(appPath); + for (String childNodeName : childNodes) { + String childNodePath = getNodePath(appPath, childNodeName); + byte[] childData = curator.getData().forPath(childNodePath); + Task task = new NullTask(); + deserialize(task, childData); + tasks.add(task); + } + return tasks; + } + + @Override + public void jobQueueStorageRemoveTask(Task task) throws Exception { + String appPath = getNodePath(zkWorkingPath, appName); + if (!exists(appPath)) { + return; + } + String taskId = task.getTaskId().toString(); + String taskRemovePath = getNodePath(appPath, taskId); + safeDelete(taskRemovePath); + } + + private void createRootDirRecursively(String path) throws Exception { + System.out.println("Path = " + path); + String pathParts[] = path.split("/"); + Preconditions.checkArgument(pathParts.length >= 1 && pathParts[0].isEmpty(), + "Invalid path: %s", path); + StringBuilder sb = new StringBuilder(); + for (int i = 1; i < pathParts.length; i++) { + sb.append("/").append(pathParts[i]); + create(sb.toString()); + } + } + + private void create(final String path) throws Exception { + if (!exists(path)) { + curator.create().withMode(CreateMode.PERSISTENT).withACL(zkAcl) + .forPath(path, null); + } + } + + private void delete(final String path) throws Exception { + if (exists(path)) { + curator.delete().deletingChildrenIfNeeded().forPath(path); + } + } + + private boolean exists(final String path) throws Exception { + return curator.checkExists().forPath(path) != null; + } + + private void safeCreate(String path, byte[] data, List acl, + CreateMode mode) throws Exception { + if (!exists(path)) { + SafeTransaction transaction = new SafeTransaction(); + transaction.create(path, data, acl, mode); + transaction.commit(); + } + } + + private void safeDelete(final String path) throws Exception { + if (exists(path)) { + SafeTransaction transaction = new SafeTransaction(); + transaction.delete(path); + transaction.commit(); + } + } + + private String getNodePath(String root, String nodeName) { + return (root + "/" + nodeName); + } + + /** + * Use curator transactions to ensure zk-operations are performed in an all or + * nothing fashion. + * + */ + private class SafeTransaction { + private CuratorTransactionFinal transactionFinal; + + SafeTransaction() throws Exception { + CuratorTransaction transaction = curator.inTransaction(); + transactionFinal = transaction.create().withMode(CreateMode.PERSISTENT) + .withACL(zkAcl).forPath(fencingNodePath, new byte[0]).and(); + } + + public void commit() throws Exception { + transactionFinal = + transactionFinal.delete().forPath(fencingNodePath).and(); + transactionFinal.commit(); + } + + public void create(String path, byte[] data, List acl, CreateMode mode) + throws Exception { + transactionFinal = transactionFinal.create().withMode(mode).withACL(acl) + .forPath(path, data).and(); + } + + public void delete(String path) throws Exception { + transactionFinal = transactionFinal.delete().forPath(path).and(); + } + } + + private byte[] serialize(Task task) throws IOException { + ByteArrayOutputStream out = null; + DataOutputStream dataOut = null; + try { + out = new ByteArrayOutputStream(); + dataOut = new DataOutputStream(out); + task.write(dataOut); + return out.toByteArray(); + } finally { + org.apache.hadoop.io.IOUtils.cleanup(null, dataOut, out); + } + } + + private byte[] deserialize(Task task, byte[] bytes) throws IOException { + ByteArrayInputStream in = null; + DataInputStream dataIn = null; + try { + in = new ByteArrayInputStream(bytes); + dataIn = new DataInputStream(in); + task.readFields(dataIn); + return bytes; + } finally { + org.apache.hadoop.io.IOUtils.cleanup(null, dataIn, in); + } + } + + @Override + public Task jobQueueStorageLoadTask(TaskId taskId) throws Exception { + Task task = new NullTask(); + String appPath = getNodePath(zkWorkingPath, appName); + if (!exists(appPath)) { + return task; + } + String taskPath = getNodePath(appPath, taskId.toString()); + byte[] childData = curator.getData().forPath(taskPath); + deserialize(task, childData); + return task; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/JobQueueStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/JobQueueStorage.java new file mode 100644 index 00000000000..348e7ae4287 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/JobQueueStorage.java @@ -0,0 +1,82 @@ +package org.apache.hadoop.applications.mawo.server.master; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.applications.mawo.server.common.MawoConfiguration; +import org.apache.hadoop.applications.mawo.server.common.NullTask; +import org.apache.hadoop.applications.mawo.server.common.Task; +import org.apache.hadoop.applications.mawo.server.common.TaskId; +import org.apache.log4j.Logger; + +public abstract class JobQueueStorage { + + private boolean jobQueueStorageEnabled; + final static Logger LOG = Logger.getLogger(JobQueueStorage.class); + protected final String appName; + + public JobQueueStorage(String appName) { + this.appName = appName; + } + + public static JobQueueStorage getJobQueueStorage(String appName) { + return new CuratorJobQueueStorage(appName); + } + + public void init(MawoConfiguration conf) throws IOException { + this.jobQueueStorageEnabled = conf.getJobQueueStorageEnabled(); + if (!jobQueueStorageEnabled) { + return; + } + jobQueueStorageInit(conf); + } + + public void start() throws Exception { + if (!jobQueueStorageEnabled) { + return; + } + LOG.info("Starting JobQueue .. "); + jobQueueStorageStart(); + } + + public void storeTask(Task task) throws Exception { + if (!jobQueueStorageEnabled) { + return; + } + jobQueueStorageStoreTask(task); + } + + public List loadTasks() throws Exception { + if (!jobQueueStorageEnabled) { + return new ArrayList(); + } + return jobQueueStorageLoadTasks(); + } + + public void removeTask(Task task) throws Exception { + if (!jobQueueStorageEnabled) { + return; + } + jobQueueStorageRemoveTask(task); + } + + public Task loadTask(TaskId taskId) throws Exception { + if (!jobQueueStorageEnabled) { + return new NullTask(); + } + return jobQueueStorageLoadTask(taskId); + } + + abstract void jobQueueStorageInit(MawoConfiguration conf) throws IOException; + + abstract void jobQueueStorageStart() throws Exception; + + abstract void jobQueueStorageStoreTask(Task task) throws Exception; + + abstract List jobQueueStorageLoadTasks() throws Exception; + + abstract Task jobQueueStorageLoadTask(TaskId taskId) throws Exception; + + abstract void jobQueueStorageRemoveTask(Task task) throws Exception; +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/Master.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/Master.java new file mode 100644 index 00000000000..c19a97c5989 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/Master.java @@ -0,0 +1,230 @@ +package org.apache.hadoop.applications.mawo.server.master; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.applications.mawo.server.common.DieTask; +import org.apache.hadoop.applications.mawo.server.common.MawoConfiguration; +import org.apache.hadoop.applications.mawo.server.common.Task; +import org.apache.hadoop.applications.mawo.server.master.job.Job; +import org.apache.hadoop.applications.mawo.server.master.job.JobBuilder; +import org.apache.hadoop.applications.mawo.server.master.job.JobBuilderFactory; +import org.apache.hadoop.applications.mawo.server.master.job.JobDescriptor; +import org.apache.hadoop.applications.mawo.server.master.job.JobId; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.log4j.Logger; + +public class Master extends AbstractService { + + private final static Logger LOG = Logger.getLogger(Master.class); + + private WorkAssigner workAssigner; + private JobQueueStorage jobQueueStorage; + private Job job = null; + private final String appName; + private final String jobFile; + MawoConfiguration mawoConf; + private Task setupTask; + private Task teardownTask; + private final long teardownWorkerValidityInterval; + + public Master(String appName, MawoConfiguration mawoConf, String jobFile) { + super(Master.class.getName()); + this.jobFile = jobFile; + this.mawoConf = mawoConf; + + workAssigner = new WorkAssigner(this); + + this.appName = appName; + this.teardownWorkerValidityInterval = this.mawoConf + .getTeardownWorkerValidityInterval(); + LOG.info("Application name: " + appName); + jobQueueStorage = JobQueueStorage.getJobQueueStorage(appName); + } + + public void setMawoConf(MawoConfiguration mawoConf) { + this.mawoConf = mawoConf; + } + + @Override + protected void serviceInit(Configuration uselessConf) throws Exception { + + this.job = new Job(mawoConf); + + jobQueueStorage.init(mawoConf); + workAssigner.init(uselessConf); + + super.serviceInit(uselessConf); + } + + public Task getSetupTask() { + return this.setupTask; + } + + public Task getTeardownTask() { + return this.teardownTask; + } + // Read job descriptions from submissions + // For each job, generate tasks, persist tasks to jobQueueStorage, and then + // add them to workAssigner so that workers can pull from that RPC server + @Override + protected void serviceStart() throws Exception { + + LOG.info("Starting MaWo Master .. "); + + jobQueueStorage.start(); + + LOG.info("Creating the Job .. "); + JobDescriptor jobDescriptor = + new JobDescriptor(this.job.getJobId(), jobFile); + JobBuilder jobBuilder = JobBuilderFactory.createJobBuilder(mawoConf); + // Job must be created before starting the work-assigner. + jobBuilder.deserializeJob(jobDescriptor, this.job); + + // Get setup task and teardown task before we start the workAssigner + this.setupTask = job.getSetupTask(); + this.teardownTask = job.getTeardownTask(); + + workAssigner.start(); + + super.serviceStart(); + + // for (String jobFile : jobFiles) { + // Job job = new Job(jobFile); + // jobList.add(job); + // } + + // for (Job job : jobList) { + // List tasks = job.generateTasks(); + // for (Task task : tasks) { + // jobQueueStorage.storeTask(task); + // workAssigner.addTask(task); + // } + // } + List tasks = job.getTasks(); + for (Task task : tasks) { + jobQueueStorage.storeTask(task); + workAssigner.addTask(task); + LOG.debug("Adding " + task.getTaskId() + " to Workassigner"); + } + + // Wait for all tasks to finish + while (!waitForAllTaskDone()) { + Thread.sleep(60); + } + + Thread.sleep(60); + + // Wait till teardown task is done + while (!workAssigner.isTeardownTaskDone()) { + Thread.sleep(60); + } + + // Disable Master die for POC + if (mawoConf.getAutoShutdownWorkers()) { + LOG.info("Shutting down all workers .."); + workAssigner.sendDieTask(new DieTask()); + + // Wait for all Workers to stop + while (workAssigner.getActiveWorkerCount() != 0) { + Thread.sleep(10); + } + } + + workAssigner.stop(); + LOG.info("MaWo Master is done .. "); + } + + + private void jobTearDown(MawoConfiguration conf) + throws ClientProtocolException, IOException { + + LOG.info("Shutting down the job"); + + HttpClient httpClient = new DefaultHttpClient(); + String baseURL = conf.getClusterManagerURL(); + if (baseURL != null) { + String resource_uri = "/services/v1/applications/"; + HttpDelete delete = new HttpDelete(baseURL + resource_uri + this.appName); + delete.setHeader("Accept", "application/json"); + httpClient.execute(delete); + } + } + + public JobQueueStorage getJobQueueStorage() { + return this.jobQueueStorage; + } + + public static void main(String[] args) throws Exception { + + Options opts = new Options(); + + Option appNameOpt = new Option("mawo_app_name", true, + "The app name. The name will be used to shut down the tear down " + + "this app.(required)"); + appNameOpt.setRequired(true); + opts.addOption(appNameOpt); + + Option jobOpt = new Option("job_descripiton_file_path", true, + "Job Description file path.(required)"); + jobOpt.setRequired(true); + opts.addOption(jobOpt); + + CommandLineParser parser = new GnuParser(); + String appName = null; + String jobFile = null; + try { + CommandLine commandLine = parser.parse(opts, args, true); + appName = commandLine.getOptionValue("mawo_app_name"); + jobFile = commandLine.getOptionValue("job_descripiton_file_path"); + } catch (ParseException e) { + LOG.error("options parsing failed: " + e.getMessage()); + HelpFormatter formatter = new HelpFormatter(); + formatter.setSyntaxPrefix(""); + formatter.printHelp("general options are:", opts); + throw e; + } + + Master master = + new Master(appName.trim(), new MawoConfiguration(), jobFile); + master.init(new Configuration()); + master.start(); + } + + public JobId getJobId() { + if (this.job == null) { + return null; + } else { + return this.job.getJobId(); + } + } + + private boolean waitForAllTaskDone() { + if (workAssigner.allTasksHaveFinished()) { + return true; + } else { + // check whether all worker have already in tearDown state + // after teardownWorkerValidityInterval + long lastWorkerRegisterTime = this.workAssigner + .getLastWorkerRegisterTime(); + if (lastWorkerRegisterTime > 0 && System.currentTimeMillis() + - lastWorkerRegisterTime > teardownWorkerValidityInterval) { + return this.workAssigner.isTeardownTaskDone(); + } else { + return false; + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/WorkAssigner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/WorkAssigner.java new file mode 100644 index 00000000000..67b0dba0d1b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/WorkAssigner.java @@ -0,0 +1,614 @@ +package org.apache.hadoop.applications.mawo.server.master; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.PrintStream; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.applications.mawo.server.common.MawoConfiguration; +import org.apache.hadoop.applications.mawo.server.common.NullTask; +import org.apache.hadoop.applications.mawo.server.common.Task; +import org.apache.hadoop.applications.mawo.server.common.TaskId; +import org.apache.hadoop.applications.mawo.server.common.TaskStatus; +import org.apache.hadoop.applications.mawo.server.common.TaskStatus.State; +import org.apache.hadoop.applications.mawo.server.common.WorkAssignmentProtocol; +import org.apache.hadoop.applications.mawo.server.worker.WorkerId; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.service.AbstractService; +import org.apache.log4j.Logger; +import org.json.simple.JSONObject; + +public class WorkAssigner extends AbstractService + implements WorkAssignmentProtocol { + + public static final String JOB_DONE_MSG = "All Workers have finished " + + "the teardown task. The job is done."; + private final static Logger LOG = Logger.getLogger(WorkAssigner.class); + + private RPC.Server server; + private MawoConfiguration mawoConf; + + final private AtomicInteger workerCounter = new AtomicInteger(0); + + private final ConcurrentLinkedQueue taskQueue = + new ConcurrentLinkedQueue(); + private Map allTasksStatuses = + new HashMap(); + + private Map workerIds = new HashMap< + WorkerId, WorkerStatus>(); + + private Map> blackList = + new HashMap>(); + + private Map teardownWorkerIdList = + new ConcurrentHashMap(); + + private Task teardownTask = null; + private Task setupTask = null; + private Task dieTask = null; + + private Master master; + + private boolean queueStorageEnabled; + + private int numberOfTasksToRun = 0; + private final ReadLock readLock; + private final WriteLock writeLock; + + private final BlockingQueue eventQueue; + + private volatile boolean blockNewEvents = false; + private volatile boolean stopped = false; + private volatile boolean drained = true; + private final Object waitForDrained = new Object(); + private PrintStream out = null; + private Thread eventHandlingThread; + private boolean dumpJobDoneMSG = false; + private volatile long lastWorkerRegisterTime = 0; + + public WorkAssigner(Master master) { + super(WorkAssigner.class.getName()); + + this.master = master; + + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + this.readLock = lock.readLock(); + this.writeLock = lock.writeLock(); + this.eventQueue = new LinkedBlockingQueue(); + } + + @Override + protected void serviceInit(Configuration uselessConf) throws Exception { + this.mawoConf = this.master.mawoConf; + this.queueStorageEnabled = (this.mawoConf).getJobQueueStorageEnabled(); + + super.serviceInit(uselessConf); + } + + @Override + protected void serviceStart() throws Exception { + + LOG.info("Starting WorkAssigner .. "); + + String logPath = this.mawoConf.getMasterTasksStatusLogPath(); + // create logPath and tasks-status logs + if (logPath.startsWith("$")) { + String logPathKey = logPath.substring(1); + logPath = System.getenv(logPathKey); + } + File dir = new File(logPath); + if (! dir.exists()) { + dir.mkdirs(); + } + File file = new File(dir, "mawo_history.log"); + if(file.exists() && !file.isDirectory()) { + file.delete(); + } + file.createNewFile(); + out = new PrintStream(file.getAbsolutePath(), "UTF-8"); + + eventHandlingThread = new Thread(createThread()); + eventHandlingThread.setName("Task status dump event handler"); + eventHandlingThread.start(); + + this.setupTask = this.master.getSetupTask(); + this.teardownTask = this.master.getTeardownTask(); + + server = new RPC.Builder(new Configuration()).setInstance(this) + .setProtocol(WorkAssignmentProtocol.class) + .setBindAddress(this.mawoConf.getRpcHostName()) + .setPort(this.mawoConf.getRpcServerPort()) + .build(); + + server.start(); + LOG.debug("RPC Server from WorkAssigner started successfully"); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + // drain the event queue + blockNewEvents = true; + LOG.info("draining to stop, ignoring any new events."); + long endTime = System.currentTimeMillis() + + this.mawoConf.getMasterDrainEventsTimeout(); + + synchronized (waitForDrained) { + while (!isDrained() && eventHandlingThread != null + && eventHandlingThread.isAlive() + && System.currentTimeMillis() < endTime) { + waitForDrained.wait(1000); + LOG.info("Waiting for dump event queue to drain. Thread state is :" + + eventHandlingThread.getState()); + } + } + stopped = true; + if (eventHandlingThread != null) { + eventHandlingThread.interrupt(); + try { + eventHandlingThread.join(); + } catch (InterruptedException ie) { + LOG.warn("Interrupted Exception while stopping", ie); + } + } + IOUtils.closeQuietly(out); + server.stop(); + super.serviceStop(); + } + + public boolean allTasksHaveFinished() { + Iterator taskIter = allTasksStatuses.values().iterator(); + int numFinishedTasks = 0; + while (taskIter.hasNext()) { + TaskStatus taskStatus = taskIter.next(); + TaskStatus.State taskState = taskStatus.getRunState(); + if (isFinalState(taskState)) { + numFinishedTasks++; + } else { + return false; + } + } + + if (numFinishedTasks >= this.numberOfTasksToRun) { + return true; + } + return false; + } + + @Override + public Task sendHeartbeat(WorkerId workerId, TaskStatus[] taskStatusList) { + + if (dieTask != null) { + // We are in the death phase + LOG.info("Sending die-task " + this.dieTask + " to the worker " + + workerId.getHostname()); + return this.dieTask; + } + if (!this.workerIds.containsKey(workerId)) { + // re-register the worker, and send back the setup task + return this.registerWorker(workerId); + } + + // If the current worker status is Registered. + // We are expecting the setup task status from worker. + // Skip other task status for now because the master will not + // assign other tasks to this worker until the setup task is down. + if (this.workerIds.get(workerId) == WorkerStatus.REGISTERED) { + TaskStatus setUpTaskStatus = retrieveSetupTask(taskStatusList); + if (setUpTaskStatus == null) { + LOG.warn("Worker:" + workerId.getHostname() + " status is " + + WorkerStatus.REGISTERED + + ". Waitting for setup task status to move forward."); + return new NullTask(); + } else { + dumpTaskStatus(null, setUpTaskStatus); + if (setUpTaskStatus.getRunState() == State.SUCCEEDED) { + this.workerIds.put(workerId, WorkerStatus.RUNNING); + } else if (setUpTaskStatus.getRunState() == State.INIT || + setUpTaskStatus.getRunState() == State.RUNNING){ + this.workerIds.put(workerId, WorkerStatus.SETUP); + return new NullTask(); + } else { + this.workerIds.put(workerId, WorkerStatus.TEARDOWN); + return getTearDownTask(); + } + } + } + + // If the current worker status is Setup which means we received setup task + // status before, but still wait for setup task to finish. + // Skip other task status for now because the master will not + // assign other tasks to this worker until the setup task is down. + if (this.workerIds.get(workerId) == WorkerStatus.SETUP) { + TaskStatus setUpTaskStatus = retrieveSetupTask(taskStatusList); + if (setUpTaskStatus == null) { + LOG.warn("Worker:" + workerId.getHostname() + " status is " + + WorkerStatus.SETUP + + ". Waitting for setup task status to move forward."); + return new NullTask(); + } else { + if (setUpTaskStatus.getRunState() == State.SUCCEEDED) { + dumpTaskStatus(null, setUpTaskStatus); + this.workerIds.put(workerId, WorkerStatus.RUNNING); + } else if (setUpTaskStatus.getRunState() == State.INIT || + setUpTaskStatus.getRunState() == State.RUNNING){ + return new NullTask(); + } else { + dumpTaskStatus(null, setUpTaskStatus); + this.workerIds.put(workerId, WorkerStatus.TEARDOWN); + return getTearDownTask(); + } + } + } + + // The worker transmits into TEARDOWN status in two scenarios: + // 1. the setup task in worker fails. + // 2. All tasks finished. + // In both scenarios, the master has already sent the taskdown task = + // to the worker. We only need to take care of the teardown task status. + if (this.workerIds.get(workerId) == WorkerStatus.TEARDOWN) { + TaskStatus previousTask = teardownWorkerIdList.get(workerId); + TaskStatus teardownTaskStatus = retrieveTeardownTask(taskStatusList); + if (teardownTaskStatus == null) { + if (previousTask == null) { + LOG.warn("Worker:" + workerId.getHostname() + " status is " + + WorkerStatus.REGISTERED + + ". Waitting for teardown task status update."); + } + } else { + dumpTaskStatus(previousTask, teardownTaskStatus); + LOG.info("Teardown task status for " + workerId.getHostname() + + " is:" + teardownTaskStatus.getRunState()); + teardownWorkerIdList.put(workerId, teardownTaskStatus); + } + return new NullTask(); + } + + // When the worker is RUNNING state. + for (TaskStatus status : taskStatusList) { + // we will handle expired task only if queueStorageEnabled + if (isTearDownTask(status)) { + LOG.warn("We do not expect the teardown task status " + + "when the worker is in RUNNING state. Ignore it."); + continue; + } + if (!isSetUpTask(status)) { + if (status.getRunState() == State.EXPIRE && queueStorageEnabled) { + handleExpireTask(workerId, status); + } else { + // dump the task status + TaskStatus previous = allTasksStatuses.get(status.getTaskId()); + dumpTaskStatus(previous, status); + allTasksStatuses.put(status.getTaskId(), status); + } + } + } + + if (allTasksHaveFinished()) { + // All Tasks finished. thus send teardown task to all workers. + LOG.info("Sending teardown task " + getTearDownTask() + + " to the worker " + workerId.getHostname()); + workerIds.put(workerId, WorkerStatus.TEARDOWN); + return getTearDownTask(); + } + + + Task task = taskQueue.poll(); + if (task != null) { + if (isWorkerInBlackList(workerId, task.getTaskId())) { + LOG.info("Returning Null Task in heartbeat. " + "The worker:" + workerId + + " is in the blacklist of Task:" + task.getTaskId()); + return new NullTask(); + } + LOG.debug("Returning " + task.getTaskId() + " in heartbeat"); + LOG.debug("tasks in queue : " + taskQueue.size()); + return task; + } else { + LOG.debug("Returning Null Task in heartbeat"); + return new NullTask(); + } + } + + // handle the expired Task. + // we will do + // 1) set current workId in black list, so we will not assign the task to the + // same worker. + // 2) load the task info(task_cmd) from JobQueueStorage, and add this task + // back to the queue. + private void handleExpireTask(WorkerId workerId, TaskStatus status) { + TaskId taskId = status.getTaskId(); + addBlackList(workerId, taskId); + if (isTaskBlackListAllWorkers(taskId)) { + LOG.warn("The task:" + taskId + + " has blacklisted all the workers. Will not retry"); + TaskStatus previous = allTasksStatuses.get(taskId); + if (previous == null || !isFinalState(previous.getRunState())) { + handleDumpEvent(status); + } + this.allTasksStatuses.put(taskId, status); + return; + } + Task task = null; + try { + task = this.master.getJobQueueStorage().loadTask(taskId); + } catch (Exception e) { + LOG.warn("Can not recover task:" + taskId); + // will not retry this task + this.allTasksStatuses.put(taskId, status); + } + taskQueue.add(task); + } + + @Override + public void addTask(Task task) { + LOG.info("Adding Task " + task.getTaskId() + " to the queue"); + numberOfTasksToRun++; + taskQueue.add(task); + } + + int getActiveWorkerCount() { + return this.workerIds.size(); + } + + @Override + public Text getNewWorkerId() { + return new Text(getNextWorkerId()); + } + + private String getNextWorkerId() { + return "worker_" + workerCounter.incrementAndGet(); + } + + @Override + public Task registerWorker(WorkerId workerId) { + LOG.info("Registering worker '" + workerId.getHostname() + "'"); + workerIds.put(workerId, WorkerStatus.REGISTERED); + lastWorkerRegisterTime = System.currentTimeMillis(); + return getSetUpTask(); + } + + public Boolean isTeardownTaskDone() { + int numFinishedWorkers = 0; + for (Entry entry : teardownWorkerIdList.entrySet()) { + TaskStatus taskStatus = entry.getValue(); + TaskStatus.State taskState = taskStatus.getRunState(); + if ((taskState == TaskStatus.State.FAILED) + || taskState == TaskStatus.State.KILLED + || taskState == TaskStatus.State.SUCCEEDED + || taskState == TaskStatus.State.EXPIRE) { + numFinishedWorkers++; + } + } + + LOG.info("Num of workers finishes the teardown task: " + + numFinishedWorkers); + + if (numFinishedWorkers == workerIds.size()) { + writeJobDoneMSG(); + return true; + } + else { + return false; + } + } + + public void sendDieTask(Task dieTask) { + this.dieTask = dieTask; + } + + @Override + public void deRegisterWorker(WorkerId workerId) { + LOG.info("Deregistering " + workerId.getHostname()); + workerIds.remove(workerId); + } + + private void addBlackList(WorkerId workerId, TaskId taskId) { + try { + this.writeLock.lock(); + if (this.blackList.containsKey(taskId)) { + Set workerIds = this.blackList.get(taskId); + workerIds.add(workerId); + this.blackList.put(taskId, workerIds); + } else { + Set workIds = new HashSet(); + workIds.add(workerId); + this.blackList.put(taskId, workIds); + } + } finally { + this.writeLock.unlock(); + } + } + + private boolean isWorkerInBlackList(WorkerId workerId, TaskId taskId) { + try { + this.readLock.lock(); + if (this.blackList.containsKey(taskId)) { + Set workerIds = this.blackList.get(taskId); + return workerIds.contains(workerId); + } + return false; + } finally { + this.readLock.unlock(); + } + } + + private boolean isTaskBlackListAllWorkers(TaskId taskId) { + try { + this.readLock.lock(); + if (this.blackList.containsKey(taskId)) { + Set workerIds = this.blackList.get(taskId); + return workerIds.size() >= this.getActiveWorkerCount(); + } + return false; + } finally { + this.readLock.unlock(); + } + } + + private boolean isFinalState(TaskStatus.State taskState) { + if ((taskState == TaskStatus.State.FAILED) + || taskState == TaskStatus.State.KILLED + || taskState == TaskStatus.State.SUCCEEDED + || taskState == TaskStatus.State.EXPIRE) { + return true; + } else { + return false; + } + } + + private boolean isDrained() { + return drained; + } + + @SuppressWarnings("unchecked") + private void handleDumpEvent(TaskStatus status) { + if (blockNewEvents) { + return; + } + JSONObject obj = new JSONObject(); + obj.put("TaskId", status.getTaskId().toString()); + obj.put("WorkerId", status.getWorkerId().toString()); + obj.put("WorkerHostName", status.getWorkerId().getHostname().toString()); + obj.put("WorkerIPAddress", status.getWorkerId().getIPAddress()); + obj.put("TaskState", status.getRunState().toString()); + obj.put("TaskType", status.getTaskType()); + obj.put("TaskCMD", status.getTaskCMD()); + obj.put("ExitCode", String.valueOf(status.getExitCode())); + SimpleDateFormat sdf = new SimpleDateFormat("MMM dd,yyyy HH:mm"); + Date startDate = new Date(status.getStartTime()); + obj.put("StartTime", sdf.format(startDate)); + Date endDate = new Date(status.getEndTime()); + obj.put("EndTime", sdf.format(endDate)); + try { + eventQueue.put(obj); + } catch (InterruptedException ex) { + if (!stopped) { + LOG.warn("task status dump thread interrupted", ex); + drained = eventQueue.isEmpty(); + throw new RuntimeException(ex); + } + } + } + + Runnable createThread() { + return new Runnable() { + @Override + public void run() { + while (!stopped && !Thread.currentThread().isInterrupted()) { + drained = eventQueue.isEmpty(); + if (blockNewEvents) { + synchronized(waitForDrained) { + if (drained) { + waitForDrained.notify(); + } + } + } + List taskStatusObjs = new ArrayList(); + eventQueue.drainTo(taskStatusObjs); + if (taskStatusObjs != null && !taskStatusObjs.isEmpty()) { + for (JSONObject obj : taskStatusObjs) { + out.println(obj); + } + out.flush(); + } + } + } + }; + } + + private boolean isTearDownTask(TaskStatus status) { + return status.getTaskId().equals(getTearDownTask().getTaskId()); + } + + private boolean isSetUpTask(TaskStatus status) { + return status.getTaskId().equals(getSetUpTask().getTaskId()); + } + + private void dumpTaskStatus(TaskStatus previous, TaskStatus status) { + if (previous == null) { + handleDumpEvent(status); + } else { + if (!isFinalState(previous.getRunState())) { + if (isFinalState(status.getRunState()) || + (status.getRunState() != previous.getRunState())) { + handleDumpEvent(status); + } + } + } + } + + private TaskStatus retrieveSetupTask(TaskStatus[] taskStatusList) { + for (TaskStatus status : taskStatusList) { + if (isSetUpTask(status)) { + return status; + } + } + return null; + } + + private TaskStatus retrieveTeardownTask(TaskStatus[] taskStatusList) { + for (TaskStatus status : taskStatusList) { + if (isTearDownTask(status)) { + return status; + } + } + return null; + } + + @VisibleForTesting + public Task getSetUpTask() { + return this.setupTask; + } + + @VisibleForTesting + public Task getTearDownTask() { + return this.teardownTask; + } + + @Private + @VisibleForTesting + public Map getWorkerStatus() { + return this.workerIds; + } + + @SuppressWarnings("unchecked") + @Private + @VisibleForTesting + public void writeJobDoneMSG() { + if (!dumpJobDoneMSG) { + JSONObject obj = new JSONObject(); + obj.put("JobId", this.master.getJobId().toString()); + obj.put("JobState", "Finished"); + obj.put("Message", JOB_DONE_MSG); + out.println(obj); + out.flush(); + dumpJobDoneMSG = true; + } + } + + public long getLastWorkerRegisterTime() { + return this.lastWorkerRegisterTime; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/WorkerStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/WorkerStatus.java new file mode 100644 index 00000000000..6b293424c1b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/WorkerStatus.java @@ -0,0 +1,8 @@ +package org.apache.hadoop.applications.mawo.server.master; + +public enum WorkerStatus { + REGISTERED, + SETUP, + RUNNING, + TEARDOWN +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/test/java/org/apache/hadoop/applications/mawo/server/master/TestWorkAssigner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/test/java/org/apache/hadoop/applications/mawo/server/master/TestWorkAssigner.java new file mode 100644 index 00000000000..00161ce31e4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/test/java/org/apache/hadoop/applications/mawo/server/master/TestWorkAssigner.java @@ -0,0 +1,257 @@ +package org.apache.hadoop.applications.mawo.server.master; + +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.applications.mawo.server.common.SimpleTask; +import org.apache.hadoop.applications.mawo.server.common.Task; +import org.apache.hadoop.applications.mawo.server.common.TaskId; +import org.apache.hadoop.applications.mawo.server.common.TaskStatus; +import org.apache.hadoop.applications.mawo.server.common.TaskType; +import org.apache.hadoop.applications.mawo.server.common.TaskStatus.State; +import org.apache.hadoop.applications.mawo.server.master.job.JobId; +import org.apache.hadoop.applications.mawo.server.worker.WorkerId; +import org.junit.Assert; +import org.junit.Test; + +public class TestWorkAssigner { + + @Test + public void testWorkAssigner() { + JobId jobId = new JobId(1); + TaskId setupTaskId = new TaskId(jobId, 0); + SimpleTask mockSetupTask = mock(SimpleTask.class); + when(mockSetupTask.getTaskId()).thenReturn(setupTaskId); + + TaskId teardownTaskId = new TaskId(jobId, 5); + SimpleTask mockTeardownTask = mock(SimpleTask.class); + when(mockTeardownTask.getTaskId()).thenReturn(teardownTaskId); + + TaskId task1_id = new TaskId(jobId, 1); + SimpleTask task1 = new SimpleTask(task1_id, null, "", 1); + + TaskId task2_id = new TaskId(jobId, 2); + SimpleTask task2 = new SimpleTask(task2_id, null, "", 1); + + Master mockMaster = mock(Master.class); + WorkAssigner workerAssigner = spy(new WorkAssigner(mockMaster)); + when(workerAssigner.getSetUpTask()).thenReturn(mockSetupTask); + when(workerAssigner.getTearDownTask()).thenReturn(mockTeardownTask); + doNothing().when(workerAssigner).writeJobDoneMSG(); + workerAssigner.addTask(task1); + workerAssigner.addTask(task2); + + // register the workers, expect to get the setup task + WorkerId workerId1 = new WorkerId(); + workerId1.setWorkerId("worker_1"); + Task returnTask = workerAssigner.registerWorker(workerId1); + Assert.assertEquals(returnTask.getTaskId(), setupTaskId); + Assert.assertTrue(workerAssigner.getWorkerStatus().containsKey(workerId1)); + Assert.assertEquals(workerAssigner.getWorkerStatus().get(workerId1), + WorkerStatus.REGISTERED); + + WorkerId workerId2 = new WorkerId(); + workerId2.setWorkerId("worker_2"); + returnTask = workerAssigner.registerWorker(workerId2); + Assert.assertEquals(returnTask.getTaskId(), setupTaskId); + Assert.assertTrue(workerAssigner.getWorkerStatus().containsKey(workerId2)); + Assert.assertEquals(workerAssigner.getWorkerStatus().get(workerId2), + WorkerStatus.REGISTERED); + + WorkerId workerId3 = new WorkerId(); + workerId3.setWorkerId("worker_3"); + returnTask = workerAssigner.registerWorker(workerId3); + Assert.assertEquals(returnTask.getTaskId(), setupTaskId); + Assert.assertTrue(workerAssigner.getWorkerStatus().containsKey(workerId3)); + Assert.assertEquals(workerAssigner.getWorkerStatus().get(workerId3), + WorkerStatus.REGISTERED); + + WorkerId workerId4 = new WorkerId(); + workerId4.setWorkerId("worker_4"); + returnTask = workerAssigner.registerWorker(workerId4); + Assert.assertEquals(returnTask.getTaskId(), setupTaskId); + Assert.assertTrue(workerAssigner.getWorkerStatus().containsKey(workerId4)); + Assert.assertEquals(workerAssigner.getWorkerStatus().get(workerId4), + WorkerStatus.REGISTERED); + + List taskStatusList = new ArrayList(); + // mimic the heartbeat from the workers + + // when the worker_1 is in Registered state, + // we are waiting for setup task status update. + // If the status list does not include setup task status, + // we would ignore them, send back a null task. The worker_1 would + // stay in registered state until it gets the setup task status update. + TaskId unknownId = new TaskId(jobId, 2000); + TaskStatus taskStatus = new TaskStatus(workerId1, unknownId, + State.RUNNING, "", ""); + taskStatusList.clear(); + taskStatusList.add(taskStatus); + returnTask = workerAssigner.sendHeartbeat(workerId1, taskStatusList.toArray( + new TaskStatus[taskStatusList.size()])); + Assert.assertEquals(returnTask.getTaskType(), TaskType.NULL); + Assert.assertEquals(workerAssigner.getWorkerStatus().get(workerId1), + WorkerStatus.REGISTERED); + + // The worker_1 is still in Registered, but we receive the + // setupTask state as failed, we would return Teardown task to worker_1, + // and move worker status to TEARDOWN. + taskStatus = new TaskStatus(workerId1, setupTaskId, + State.FAILED, "", ""); + taskStatusList.clear(); + taskStatusList.add(taskStatus); + returnTask = workerAssigner.sendHeartbeat(workerId1, taskStatusList.toArray( + new TaskStatus[taskStatusList.size()])); + Assert.assertEquals(returnTask.getTaskId(), teardownTaskId); + Assert.assertEquals(workerAssigner.getWorkerStatus().get(workerId1), + WorkerStatus.TEARDOWN); + + // the worker_2 status is Registered, and we receive the + // setupTask state as SUCCEEDED, we would transit the worker_2 state + // to RUNNING, and return a task. + taskStatus = new TaskStatus(workerId2, setupTaskId, + State.SUCCEEDED, "", ""); + taskStatusList.clear(); + taskStatusList.add(taskStatus); + returnTask = workerAssigner.sendHeartbeat(workerId2, taskStatusList.toArray( + new TaskStatus[taskStatusList.size()])); + Assert.assertEquals(returnTask.getTaskId(), task1_id); + Assert.assertEquals(workerAssigner.getWorkerStatus().get(workerId2), + WorkerStatus.RUNNING); + + // the worker_3 status is Registered, and we receive the + // setupTask state as RUNNING, we would transit the worker_3 state + // from REGISTER to SETUP. Still needs to wait for setup task finish. + taskStatus = new TaskStatus(workerId3, setupTaskId, State.RUNNING, "", ""); + taskStatusList.clear(); + taskStatusList.add(taskStatus); + returnTask = workerAssigner.sendHeartbeat(workerId3, taskStatusList.toArray( + new TaskStatus[taskStatusList.size()])); + Assert.assertEquals(returnTask.getTaskType(), TaskType.NULL); + Assert.assertEquals(workerAssigner.getWorkerStatus().get(workerId3), + WorkerStatus.SETUP); + + // the worker_3 status is SETUP, but we receive the + // setupTask state as failed, we would return Teardown task to worker_3, + // and move worker status to TEARDOWN. + taskStatus = new TaskStatus(workerId3, setupTaskId, + State.FAILED, "", ""); + taskStatusList.clear(); + taskStatusList.add(taskStatus); + returnTask = workerAssigner.sendHeartbeat(workerId3, taskStatusList.toArray( + new TaskStatus[taskStatusList.size()])); + Assert.assertEquals(returnTask.getTaskId(), teardownTaskId); + Assert.assertEquals(workerAssigner.getWorkerStatus().get(workerId3), + WorkerStatus.TEARDOWN); + + // move worker_4 from REGISTER state to SETUP state + taskStatus = new TaskStatus(workerId4, setupTaskId, State.RUNNING, "", ""); + taskStatusList.clear(); + taskStatusList.add(taskStatus); + returnTask = workerAssigner.sendHeartbeat(workerId4, taskStatusList.toArray( + new TaskStatus[taskStatusList.size()])); + Assert.assertEquals(returnTask.getTaskType(), TaskType.NULL); + Assert.assertEquals(workerAssigner.getWorkerStatus().get(workerId4), + WorkerStatus.SETUP); + // move work_4 from SETUP state to RUNNING state + taskStatus = new TaskStatus(workerId4, setupTaskId, + State.SUCCEEDED, "", ""); + taskStatusList.clear(); + taskStatusList.add(taskStatus); + returnTask = workerAssigner.sendHeartbeat(workerId4, taskStatusList.toArray( + new TaskStatus[taskStatusList.size()])); + Assert.assertEquals(returnTask.getTaskId(), task2_id); + Assert.assertEquals(workerAssigner.getWorkerStatus().get(workerId4), + WorkerStatus.RUNNING); + + // Right now, we still have two task running in worker_2 and worker_4 + Assert.assertFalse(workerAssigner.allTasksHaveFinished()); + Assert.assertFalse(workerAssigner.isTeardownTaskDone()); + + // finish the task running in worker_2. + // the task in worker_4 is still running, so the worker_2 would still be + // in RUNNING state, and get NULL task. + taskStatus = new TaskStatus(workerId2, task1_id, + State.SUCCEEDED, "", ""); + taskStatusList.clear(); + taskStatusList.add(taskStatus); + returnTask = workerAssigner.sendHeartbeat(workerId2, taskStatusList.toArray( + new TaskStatus[taskStatusList.size()])); + Assert.assertEquals(returnTask.getTaskType(), TaskType.NULL); + Assert.assertEquals(workerAssigner.getWorkerStatus().get(workerId2), + WorkerStatus.RUNNING); + + // finish the task running in worker_4 + // all tasks finished, the teardown task would be sent to worker_4 + taskStatus = new TaskStatus(workerId4, task2_id, + State.SUCCEEDED, "", ""); + taskStatusList.clear(); + taskStatusList.add(taskStatus); + returnTask = workerAssigner.sendHeartbeat(workerId4, taskStatusList.toArray( + new TaskStatus[taskStatusList.size()])); + Assert.assertEquals(returnTask.getTaskId(), teardownTaskId); + Assert.assertEquals(workerAssigner.getWorkerStatus().get(workerId4), + WorkerStatus.TEARDOWN); + + // another heartbeat from worker_2 which could get teardown task + taskStatusList.clear(); + returnTask = workerAssigner.sendHeartbeat(workerId2, taskStatusList.toArray( + new TaskStatus[taskStatusList.size()])); + Assert.assertEquals(returnTask.getTaskId(), teardownTaskId); + Assert.assertEquals(workerAssigner.getWorkerStatus().get(workerId2), + WorkerStatus.TEARDOWN); + + Assert.assertTrue(workerAssigner.allTasksHaveFinished()); + + // update the teardown task status for all the workers + taskStatus = new TaskStatus(workerId1, teardownTaskId, + State.SUCCEEDED, "", ""); + taskStatusList.clear(); + taskStatusList.add(taskStatus); + returnTask = workerAssigner.sendHeartbeat(workerId1, taskStatusList.toArray( + new TaskStatus[taskStatusList.size()])); + Assert.assertEquals(returnTask.getTaskType(), TaskType.NULL); + Assert.assertEquals(workerAssigner.getWorkerStatus().get(workerId1), + WorkerStatus.TEARDOWN); + + taskStatus = new TaskStatus(workerId2, teardownTaskId, + State.SUCCEEDED, "", ""); + taskStatusList.clear(); + taskStatusList.add(taskStatus); + returnTask = workerAssigner.sendHeartbeat(workerId2, taskStatusList.toArray( + new TaskStatus[taskStatusList.size()])); + Assert.assertEquals(returnTask.getTaskType(), TaskType.NULL); + Assert.assertEquals(workerAssigner.getWorkerStatus().get(workerId2), + WorkerStatus.TEARDOWN); + + taskStatus = new TaskStatus(workerId3, teardownTaskId, + State.SUCCEEDED, "", ""); + taskStatusList.clear(); + taskStatusList.add(taskStatus); + returnTask = workerAssigner.sendHeartbeat(workerId3, taskStatusList.toArray( + new TaskStatus[taskStatusList.size()])); + Assert.assertEquals(returnTask.getTaskType(), TaskType.NULL); + Assert.assertEquals(workerAssigner.getWorkerStatus().get(workerId3), + WorkerStatus.TEARDOWN); + + taskStatus = new TaskStatus(workerId4, teardownTaskId, + State.SUCCEEDED, "", ""); + taskStatusList.clear(); + taskStatusList.add(taskStatus); + returnTask = workerAssigner.sendHeartbeat(workerId4, taskStatusList.toArray( + new TaskStatus[taskStatusList.size()])); + Assert.assertEquals(returnTask.getTaskType(), TaskType.NULL); + Assert.assertEquals(workerAssigner.getWorkerStatus().get(workerId4), + WorkerStatus.TEARDOWN); + + Assert.assertTrue(workerAssigner.isTeardownTaskDone()); + verify(workerAssigner, times(1)).writeJobDoneMSG(); + + } +}