diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/worker/CompositeTaskRunner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/worker/CompositeTaskRunner.java new file mode 100644 index 00000000000..7393a8c84ae --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/worker/CompositeTaskRunner.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.applications.mawo.server.worker; + +import java.util.Map; +import java.util.Map.Entry; +import org.apache.hadoop.applications.mawo.server.common.CompositeTask; +import org.apache.hadoop.applications.mawo.server.common.TaskStatus; + +public class CompositeTaskRunner extends TaskRunner implements Runnable { + + private Thread thread; + private CompositeTask compositeTask; + + CompositeTaskRunner(WorkerId workerId, CompositeTask compositeTask, + String workSpace, Map whiteListEnv) { + super(workerId, compositeTask, workSpace, whiteListEnv); + this.compositeTask = compositeTask; + } + + public void start() { + String threadName = this.compositeTask.getTaskId().toString(); + + System.out.println("Starting thread " + threadName); + if (thread == null) { + thread = new Thread(this, threadName); + thread.start(); + System.out.println("Thread " + threadName + " started"); + } + } + + @Override + protected int executeCommand(Map env, String command) { + // add WorkerId, WorkerIP and WorkerHostName into env + WorkerId workerId = this.getWorkerId(); + env.put("WorkerId", workerId.getWorkerId()); + env.put("WorkerIP", workerId.getIPAddress()); + env.put("WorkerHostName", workerId.getHostname().toString()); + + Map additionalEnv = this.getEnvVariables(); + if (additionalEnv != null && !additionalEnv.isEmpty()) { + for (Entry pair : additionalEnv.entrySet()) { + env.put(pair.getKey(), pair.getValue()); + } + } + /* TODO */ + return 0; + } + + public void run() { + try { + + setcurrentTaskState(TaskStatus.State.RUNNING, -1); + + // Read task file and execute each line serially + int exitCode = executeCommand(this.compositeTask.getEnvironment(), + this.compositeTask.getTaskCmd()); + + if (exitCode == 0) { + setcurrentTaskState(TaskStatus.State.SUCCEEDED, exitCode); + } else { + setcurrentTaskState(TaskStatus.State.FAILED, exitCode); + } + } catch (Exception e) { + setcurrentTaskState(TaskStatus.State.KILLED, 137); + e.printStackTrace(); + } + } + + @Override + public void expire() { + thread.interrupt(); + setcurrentTaskState(TaskStatus.State.EXPIRE, 124); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/worker/SimpleTaskRunner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/worker/SimpleTaskRunner.java new file mode 100644 index 00000000000..54ffb417fba --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/worker/SimpleTaskRunner.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.applications.mawo.server.worker; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.hadoop.applications.mawo.server.common.SimpleTask; +import org.apache.hadoop.applications.mawo.server.common.TaskStatus; +import org.apache.hadoop.util.Shell.ShellCommandExecutor; +import org.apache.log4j.Logger; + +class SimpleTaskRunner extends TaskRunner implements Runnable { + + private final static Logger LOG = Logger.getLogger(SimpleTaskRunner.class); + + private Thread thread; + + private SimpleTask simpleTask; + private ShellCommandExecutor shexec = null; + + private final String taskWorkspace; + + SimpleTaskRunner(WorkerId workerId, SimpleTask singleTask, String workSpace, + Map whiteListEnv) { + super(workerId, singleTask, workSpace, whiteListEnv); + this.simpleTask = singleTask; + taskWorkspace = getWorkSpace() + "/" + getTaskId().toString(); + LOG.info("SimpleTaskRunner launched " + getTaskId()); + } + + public void start() { + LOG.info(getTaskId() + " state change to STARTED"); + + if (thread == null) { + thread = new Thread(this, getTaskId().toString()); + thread.start(); + } + } + + @Override + protected int executeCommand(Map env, String command) { + try { + // add WorkerId, WorkerIP and WorkerHostName into env + WorkerId workerId = this.getWorkerId(); + env.put("WorkerId", workerId.getWorkerId()); + env.put("WorkerIP", workerId.getIPAddress()); + env.put("WorkerHostName", workerId.getHostname().toString()); + env.put("TaskId", getTaskId().toString()); + Map additionalEnv = this.getEnvVariables(); + if (additionalEnv != null && !additionalEnv.isEmpty()) { + for (Entry pair : additionalEnv.entrySet()) { + env.put(pair.getKey(), pair.getValue()); + } + } + + setcurrentTaskState(TaskStatus.State.RUNNING, -1); + LOG.info(getTaskId() + " state changed to RUNNING"); + shexec = new ShellCommandExecutor( + new String[] { "/bin/bash", "-c", command + commandPostFix() }, null, + env); + shexec.execute(); + } catch (IOException e) { + e.printStackTrace(); + } + return shexec.getExitCode(); + } + + @Override + public void run() { + try { + // Read task file and execute each line serially + int exitCode = executeCommand(this.simpleTask.getEnvironment(), + this.simpleTask.getTaskCmd()); + + if (exitCode == 0) { + setcurrentTaskState(TaskStatus.State.SUCCEEDED, exitCode); + LOG.info(getTaskId() + " state changed to SUCCEEDED"); + } else { + setcurrentTaskState(TaskStatus.State.FAILED, exitCode); + LOG.info(getTaskId() + " state changed to FAILED"); + } + + } catch (Exception e) { + e.printStackTrace(); + setcurrentTaskState(TaskStatus.State.KILLED, 137); + LOG.info(getTaskId() + " state changed to KILLED"); + } + + } + + private String commandPostFix() { + return " 1>" + taskWorkspace + "_stdout" + " 2>" + taskWorkspace + + "_stderr"; + } + + @Override + public void expire() { + thread.interrupt(); + setcurrentTaskState(TaskStatus.State.EXPIRE, 124); + LOG.info(getTaskId() + " state changed to EXPIRE"); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/worker/TaskRunner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/worker/TaskRunner.java new file mode 100644 index 00000000000..87223f4b1dc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/worker/TaskRunner.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.applications.mawo.server.worker; + +import java.util.Map; + +import org.apache.hadoop.applications.mawo.server.common.Task; +import org.apache.hadoop.applications.mawo.server.common.TaskId; +import org.apache.hadoop.applications.mawo.server.common.TaskStatus; + +public abstract class TaskRunner { + + private TaskId taskId; + private long taskTimeout; + + private WorkerId workerId; + private String workSpace; + private TaskStatus taskStatus; + private Map whiteListEnv; + + public TaskRunner(WorkerId workerId, Task task, String workSpace, + Map whiteListEnv) { + this.workerId = workerId; + this.taskId = task.getTaskId(); + this.taskTimeout = task.getTimeout(); + this.taskStatus = new TaskStatus(workerId, taskId, task.getTaskCmd(), + task.getTaskType().toString()); + this.setWorkSpace(workSpace); + this.whiteListEnv = whiteListEnv; + } + + protected abstract int executeCommand(Map env, + String command); + + protected abstract void expire(); + + public TaskId getTaskId() { + return taskId; + } + + public long getTimeout() { + return this.taskTimeout; + } + + public TaskStatus getTaskStatus() { + return taskStatus; + } + + protected void setcurrentTaskState(TaskStatus.State currentState, + int exitCode) { + if (!inFinalState()) { + if (currentState == TaskStatus.State.RUNNING) { + taskStatus.setStartTime(); + } + if (currentState == TaskStatus.State.SUCCEEDED + || currentState == TaskStatus.State.FAILED + || currentState == TaskStatus.State.KILLED + || currentState == TaskStatus.State.EXPIRE) { + taskStatus.setEndTime(); + taskStatus.setExitCode(exitCode); + } + taskStatus.setRunState(currentState); + } + } + + public String getWorkSpace() { + if (workSpace.startsWith("$")) { + String workSpaceKey = workSpace.substring(1); + workSpace = System.getenv(workSpaceKey); + setWorkSpace(workSpace); + } + return workSpace; + } + + public void setWorkSpace(String workSpace) { + this.workSpace = workSpace; + } + + private boolean inFinalState() { + TaskStatus.State state = taskStatus.getRunState(); + return (state == TaskStatus.State.SUCCEEDED + || state == TaskStatus.State.FAILED || state == TaskStatus.State.KILLED + || state == TaskStatus.State.EXPIRE); + } + + public WorkerId getWorkerId() { + return this.workerId; + } + + public Map getEnvVariables() { + return this.whiteListEnv; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/worker/TaskWallTimeLimiter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/worker/TaskWallTimeLimiter.java new file mode 100644 index 00000000000..12c6dac7711 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/worker/TaskWallTimeLimiter.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.applications.mawo.server.worker; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.applications.mawo.server.common.TaskStatus; +import org.apache.hadoop.applications.mawo.server.common.TaskStatus.State; +import org.apache.log4j.Logger; + +public class TaskWallTimeLimiter { + + private volatile boolean stopped; + private final int monitorInterval; + private Thread checkerThread; + private List running = new ArrayList(); + final static Logger logger = Logger.getLogger(TaskWallTimeLimiter.class); + + public TaskWallTimeLimiter() { + this.monitorInterval = 30; // Milliseconds. TODO - hardcoded + } + + protected void serviceStart() throws Exception { + assert !stopped : "starting when already stopped"; + checkerThread = new Thread(new statusChecker()); + checkerThread.setName("Task Status Checker"); + checkerThread.start(); + } + + protected void serviceStop() throws Exception { + stopped = true; + if (checkerThread != null) { + checkerThread.interrupt(); + } + } + + public synchronized void register(TaskRunner ob) { + running.add(ob); + } + + public synchronized void unregister(TaskRunner ob) { + running.remove(ob); + } + + private class statusChecker implements Runnable { + public void run() { + while (!stopped && !Thread.currentThread().isInterrupted()) { + synchronized (TaskWallTimeLimiter.this) { + Iterator iterator = running.iterator(); + long currentTime = System.currentTimeMillis(); + + while (iterator.hasNext()) { + TaskRunner entry = iterator.next(); + TaskStatus taskStatus = entry.getTaskStatus(); + long startTime = taskStatus.getStartTime(); + long taskTimeout = entry.getTimeout() * 1000; + if (startTime > 0) { + if (currentTime > taskTimeout + startTime + && entry.getTaskStatus().getRunState() == State.RUNNING) { + logger.info("Task " + taskStatus.getTaskId() + + " has expired. startTime: " + startTime + + " taskTimeout: " + taskTimeout); + iterator.remove(); + entry.expire(); + } + } + } + } + try { + Thread.sleep(monitorInterval); + } catch (InterruptedException e) { + logger.info("TaskLivenessMonitor statusChecker thread interrupted"); + break; + } + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/worker/Worker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/worker/Worker.java new file mode 100644 index 00000000000..e19f3317a1c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/worker/Worker.java @@ -0,0 +1,333 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.applications.mawo.server.worker; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.applications.mawo.server.common.CompositeTask; +import org.apache.hadoop.applications.mawo.server.common.MawoConfiguration; +import org.apache.hadoop.applications.mawo.server.common.SimpleTask; +import org.apache.hadoop.applications.mawo.server.common.Task; +import org.apache.hadoop.applications.mawo.server.common.TaskId; +import org.apache.hadoop.applications.mawo.server.common.TaskStatus; +import org.apache.hadoop.applications.mawo.server.common.TaskType; +import org.apache.hadoop.applications.mawo.server.common.WorkAssignmentProtocol; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.service.AbstractService; +import org.apache.log4j.Logger; + +/** + * A client to the Master Has a loop where this client connects to RPC server + * and looks for tasks If no tasks are available, it waits. if a task is found, + * it executes the task using TaskRunner and then passes the whole Task to a + * TaskRunner which executes the task. + * + */ +public class Worker extends AbstractService { + + private final static Logger LOG = Logger.getLogger(Worker.class); + private final static int NUM_COMPLETE_TASKS_IN_MEMORY = 10; + + private final MawoConfiguration mawoConf; + + private final int rpcServerPort; + private final String rpcServerHostname; + private final InetSocketAddress waAddress; + + private final WorkerId workerId; + + private final String workSpace; + + private TaskWallTimeLimiter taskWallTimeLimiter = null; + + private Map taskRunnerMap = + new HashMap(); + private Map runningTaskStatusList = + new HashMap(); + + private List completedTaskStatusList = + new ArrayList(NUM_COMPLETE_TASKS_IN_MEMORY); + + private Map expiredTaskMap = + new HashMap(); + private Map newlyExpiredTaskMap = + new HashMap(); + + private Map whiteListEnv = new HashMap(); + + public Worker(MawoConfiguration conf) { + super(Worker.class.getName()); + + this.mawoConf = conf; + + this.rpcServerPort = conf.getRpcServerPort(); + this.rpcServerHostname = conf.getRpcHostName(); + this.waAddress = new InetSocketAddress(rpcServerHostname, rpcServerPort); + + this.workerId = new WorkerId(); + this.workSpace = conf.getWorkerWorkSpace(); + + // create workspace for this worker + File file = new File(workSpace); + if (!file.exists()) { + file.mkdirs(); + } + + taskWallTimeLimiter = new TaskWallTimeLimiter(); + + List variables = conf.getWorkerWhiteListEnv(); + if (variables != null && !variables.isEmpty()) { + for (String key : variables) { + String value = System.getenv(key); + whiteListEnv.put(key, value); + } + } + } + + private WorkAssignmentProtocol startClientToTheMaster() throws IOException { + LOG.info("Starting a client to the master at " + waAddress); + WorkAssignmentProtocol workAssignmentProtocol = + RPC.getProxy(WorkAssignmentProtocol.class, + RPC.getProtocolVersion(WorkAssignmentProtocol.class), waAddress, + new Configuration()); + return (WorkAssignmentProtocol) RetryProxy.create( + WorkAssignmentProtocol.class, workAssignmentProtocol, + RetryPolicies.RETRY_FOREVER); + } + + private void startSimpleTask(SimpleTask simpleTask) throws IOException { + + TaskId simpleTaskId = simpleTask.getTaskId(); + LOG.info("Starting a simple task " + simpleTaskId); + + SimpleTaskRunner simpleTaskRunner = + new SimpleTaskRunner(this.workerId, simpleTask, this.workSpace, + this.whiteListEnv); + this.taskRunnerMap.put(simpleTaskId, simpleTaskRunner); + this.taskWallTimeLimiter.register(simpleTaskRunner); + + simpleTaskRunner.start(); + } + + private void startCompositeTask(CompositeTask compositeTask) { + + TaskId compositeTaskId = compositeTask.getTaskId(); + LOG.info("Starting a composite task " + compositeTaskId); + + CompositeTaskRunner compositeTaskRunner = + new CompositeTaskRunner(this.workerId, compositeTask, this.workSpace, + this.whiteListEnv); + this.taskRunnerMap.put(compositeTaskId, compositeTaskRunner); + this.taskWallTimeLimiter.register(compositeTaskRunner); + + compositeTaskRunner.start(); + } + + private void stopWorker(WorkAssignmentProtocol workAssignmentProtocol) + throws Exception { + LOG.info("Stopping the worker"); + // RPC.stopProxy(workAssignmentProtocol); + taskWallTimeLimiter.serviceStop(); + } + + private void updateTaskStatuses() { + + Iterator> it = + this.taskRunnerMap.entrySet().iterator(); + while (it.hasNext()) { + + Map.Entry entry = it.next(); + TaskRunner runner = entry.getValue(); + TaskStatus currentTaskStatus = runner.getTaskStatus(); + TaskId taskId = entry.getKey(); + LOG.debug( + "TaskId : " + taskId + " status: " + currentTaskStatus.getRunState()); + if ((currentTaskStatus.getRunState() == TaskStatus.State.SUCCEEDED) + || (currentTaskStatus.getRunState() == TaskStatus.State.KILLED) + || (currentTaskStatus.getRunState() == TaskStatus.State.FAILED) + || (currentTaskStatus.getRunState() == TaskStatus.State.EXPIRE)) { + + LOG.info("TaskId : " + entry.getKey() + " finished in " + + (currentTaskStatus.getEndTime() + - currentTaskStatus.getStartTime()) + + "ms."); + + it.remove(); + this.taskWallTimeLimiter.unregister(runner); + + if (currentTaskStatus.getRunState() == TaskStatus.State.EXPIRE) { + if (!expiredTaskMap.containsKey(taskId)) { + expiredTaskMap.put(taskId, currentTaskStatus); + newlyExpiredTaskMap.put(taskId, currentTaskStatus); + } + } else { + if (completedTaskStatusList.size() < NUM_COMPLETE_TASKS_IN_MEMORY) { + completedTaskStatusList.add(currentTaskStatus); + } else { + // Remove the oldest item + completedTaskStatusList.remove(0); + completedTaskStatusList.add(currentTaskStatus); + } + } + // Remove it from the runningTaskStatusList + this.runningTaskStatusList.remove(taskId); + } else { + this.runningTaskStatusList.put(taskId, currentTaskStatus); + } + } + } + + private Boolean isTaskdone(Task task) { + TaskId taskId = task.getTaskId(); + if (taskRunnerMap.get(taskId) != null) { + return false; + } else { + return true; + } + } + + private Boolean workerHasReachedConcurrentTasksLimit() { + if (this.taskRunnerMap.size() < mawoConf.getWorkerConcurrentTasksLimit()) { + return false; + } else { + return true; + } + } + + @Override + protected void serviceStart() throws Exception { + + // start task liveness monitor + taskWallTimeLimiter.serviceStart(); + + super.serviceStart(); + + // Create client + LOG.info("Starting Worker '" + this.workerId.getHostname() + "'"); + + WorkAssignmentProtocol workAssignmentProtocol = startClientToTheMaster(); + + Text newWorkerId = workAssignmentProtocol.getNewWorkerId(); + this.workerId.setWorkerId(newWorkerId.toString()); + + Task setupTask = workAssignmentProtocol.registerWorker(this.workerId); + startSetupTask(setupTask); + + // wait for setupTask to finish + while (true) { + updateTaskStatuses(); + if (isTaskdone(setupTask) == true) { + break; + } else { + Thread.sleep(60); + } + } + + // Loop through to get single Task from WorkAssigner + + while (true) { + + updateTaskStatuses(); + + // If worker is running max number of permitted tasks. + // then wait here for worker tasks to be free. + while (workerHasReachedConcurrentTasksLimit()) { + Thread.sleep(10); + updateTaskStatuses(); + } + + // Create a full Task list with Running tasks, completed tasks + List fullTaskList = + new ArrayList(runningTaskStatusList.values()); + fullTaskList.addAll(completedTaskStatusList); + fullTaskList.addAll(newlyExpiredTaskMap.values()); + + Thread.sleep(200); // To avoid tight loop of heartbeats + + // Send a heartbeat + Task task = workAssignmentProtocol.sendHeartbeat(this.workerId, + fullTaskList.toArray(new TaskStatus[fullTaskList.size()])); + newlyExpiredTaskMap.clear(); + + if (task.getTaskType() == TaskType.SIMPLE) { + // Submit task and add in TaskStatus Map + SimpleTask singleTask = new SimpleTask(task); + startSimpleTask(singleTask); + } else if (task.getTaskType() == TaskType.COMPOSITE) { + CompositeTask compositeTask = new CompositeTask(task); + startCompositeTask(compositeTask); + } else if (task.getTaskType() == TaskType.NULL) { + LOG.debug("Received a Null Task from heartbeat. Waiting .."); + } else if (task.getTaskType() == TaskType.TEARDOWN) { + startSimpleTask((SimpleTask) task); + } else if (task.getTaskType() == TaskType.DIE) { + LOG.debug("Received a Die Task from heartbeat. Time to go .."); + workAssignmentProtocol.deRegisterWorker(this.workerId); + stopWorker(workAssignmentProtocol); + return; + } else { + LOG.info("Received an invalid task-type from heartbeat: " + + task.getTaskType()); + } + } + } + + private void startSetupTask(Task setupTask) throws IOException { + // start setupTask + LOG.info("Setup Task received: " + setupTask.toString()); + if (setupTask.getTaskType() == TaskType.SIMPLE) { + SimpleTask simpleSetupTask = new SimpleTask(setupTask); + startSimpleTask(simpleSetupTask); + } else if (setupTask.getTaskType() == TaskType.COMPOSITE) { + CompositeTask compositeTask = new CompositeTask(setupTask); + startCompositeTask(compositeTask); + } else if (setupTask.getTaskType() == TaskType.NULL) { + // System.out.println("Its a Null setup Task"); + LOG.info("Null Setup Task received .. "); + } else { + // System.out.println("Invalid setup Task"); + LOG.info("Invalid type of Setup Task receieved .. "); + } + } + + public WorkerId getWorkerId() { + return this.workerId; + } + + public static void main(String[] args) { + + MawoConfiguration conf = new MawoConfiguration(); + Worker w = new Worker(conf); + w.init(new Configuration()); + w.start(); + + LOG.info("MaWo Worker is done .. "); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/test/java/org/apache/hadoop/applications/mawo/server/worker/TestTaskRunner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/test/java/org/apache/hadoop/applications/mawo/server/worker/TestTaskRunner.java new file mode 100644 index 00000000000..549e9a7fa30 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/test/java/org/apache/hadoop/applications/mawo/server/worker/TestTaskRunner.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.applications.mawo.server.worker; + +import org.apache.hadoop.applications.mawo.server.common.SimpleTask; +import org.apache.hadoop.applications.mawo.server.common.TaskId; +import org.apache.hadoop.applications.mawo.server.common.TaskStatus; +import org.apache.hadoop.applications.mawo.server.master.job.JobId; +import java.lang.Thread; +import java.util.HashMap; +import java.util.Map; +import org.junit.Assert; +import org.junit.Test; + +public class TestTaskRunner { + + @Test + public void testSimpleTaskRunner() throws Exception{ + + // Get worker Id + WorkerId workerId1 = new WorkerId(); + workerId1.setWorkerId("worker1"); + + // Get one simple Task + JobId jobId = new JobId(1); + + TaskId task1_id = new TaskId(jobId, 1); + Map taskEnv = new HashMap(); + taskEnv.put("test", "test"); + SimpleTask task1 = new SimpleTask(task1_id, taskEnv, "sleep 1", 1); + + String workspace = "/tmp"; + + Map whiteListTestEnv = new HashMap(); + + SimpleTaskRunner simpleTestTaskRunner = new SimpleTaskRunner(workerId1, task1, workspace, whiteListTestEnv); + + simpleTestTaskRunner.start(); + + Thread.sleep(2000); + + TaskStatus simpleTestTaskStatus = simpleTestTaskRunner.getTaskStatus(); + + Assert.assertEquals(simpleTestTaskStatus.getRunState(), TaskStatus.State.SUCCEEDED); + + } + +}