Index: core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java =================================================================== --- core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (Revision 1383877) +++ core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (Arbeitskopie) @@ -288,16 +288,19 @@ @Override public long getSplitSize() { - // TODO Auto-generated method stub return 0; } @Override public long getPos() throws IOException { - // TODO Auto-generated method stub return 0; } + @Override + public TaskAttemptID getTaskId() { + return null; + } + } public static class TempSyncClient extends BSPPeerSyncClient { Index: core/src/main/java/org/apache/hama/pipes/Application.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/Application.java (Revision 0) +++ core/src/main/java/org/apache/hama/pipes/Application.java (Arbeitskopie) @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** MODIFIED FOR GPGPU Usage! **/ + +package org.apache.hama.pipes; + +import java.io.File; +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.StringUtils; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.bsp.TaskAttemptID; +import org.apache.hama.bsp.TaskLog; + +/** + * This class is responsible for launching and communicating with the child + * process. + * + * Adapted from Hadoop Pipes. + * + */ +public class Application { + + private static final Log LOG = LogFactory.getLog(Application.class.getName()); + private ServerSocket serverSocket; + private Process process; + private Socket clientSocket; + + private DownwardProtocol downlink; + + static final boolean WINDOWS = System.getProperty("os.name").startsWith( + "Windows"); + + /** + * Start the child process to handle the task for us. + * + * @param peer the current peer including the task's configuration + * @throws InterruptedException + * @throws IOException + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + Application(BSPPeer peer) throws IOException, + InterruptedException { + + // this.peer = peer; + + serverSocket = new ServerSocket(0); + Map env = new HashMap(); + // add TMPDIR environment variable with the value of java.io.tmpdir + env.put("TMPDIR", System.getProperty("java.io.tmpdir")); + env.put("hama.pipes.command.port", + Integer.toString(serverSocket.getLocalPort())); + + /* Set Logging Environment from Configuration */ + env.put("hama.pipes.logging", + peer.getConfiguration().getBoolean("hama.pipes.logging", false) ? "1" + : "0"); + LOG.debug("DEBUG hama.pipes.logging: " + + peer.getConfiguration().getBoolean("hama.pipes.logging", false)); + + List cmd = new ArrayList(); + String interpretor = peer.getConfiguration().get( + "hama.pipes.executable.interpretor"); + if (interpretor != null) { + cmd.add(interpretor); + } + + // Check whether the applicaton will run on GPU and take right executable + String executable = null; + try { + LOG.debug("DEBUG LocalCacheFilesCount: " + + DistributedCache.getLocalCacheFiles(peer.getConfiguration()).length); + for (Path u : DistributedCache + .getLocalCacheFiles(peer.getConfiguration())) + LOG.debug("DEBUG LocalCacheFiles: " + u); + + executable = DistributedCache.getLocalCacheFiles(peer.getConfiguration())[0] + .toString(); + + LOG.info("executable: " + executable); + + } catch (Exception e) { + LOG.error("Executable: " + executable + " fs.default.name: " + + peer.getConfiguration().get("fs.default.name")); + + throw new IOException("Executable is missing!"); + } + + if (!new File(executable).canExecute()) { + // LinuxTaskController sets +x permissions on all distcache files already. + // In case of DefaultTaskController, set permissions here. + FileUtil.chmod(executable, "u+x"); + } + cmd.add(executable); + + String additionalArgs = peer.getConfiguration().get( + "hama.pipes.executable.args"); + if (additionalArgs != null && !additionalArgs.isEmpty()) { + String[] split = additionalArgs.split(" "); + for (String s : split) + cmd.add(s); + } + + // wrap the command in a stdout/stderr capture + TaskAttemptID taskid = peer.getTaskId(); + // we are starting map/reduce task of the pipes job. this is not a cleanup + // attempt. + File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT); + File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR); + // Get the desired maximum length of task's logs. + long logLength = TaskLog.getTaskLogLength(peer.getConfiguration()); + if (!peer.getConfiguration().getBoolean("hama.streaming.enabled", false)) { + cmd = TaskLog.captureOutAndError(null, cmd, stdout, stderr, logLength); + } else { + // use tee in streaming to get the output to file + cmd = TaskLog.captureOutAndErrorTee(null, cmd, stdout, stderr, logLength); + } + + if (!stdout.getParentFile().exists()) { + stdout.getParentFile().mkdirs(); + LOG.info("STDOUT: " + stdout.getParentFile().getAbsolutePath() + + " created!"); + } + LOG.info("STDOUT: " + stdout.getAbsolutePath()); + + if (!stderr.getParentFile().exists()) { + stderr.getParentFile().mkdirs(); + LOG.info("STDERR: " + stderr.getParentFile().getAbsolutePath() + + " created!"); + } + LOG.info("STDERR: " + stderr.getAbsolutePath()); + + LOG.info("DEBUG: cmd: " + cmd); + + process = runClient(cmd, env); // fork c++ binary + + LOG.info("DEBUG: waiting for Client at " + + serverSocket.getLocalSocketAddress()); + + try { + if (peer.getConfiguration().getBoolean("hama.streaming.enabled", false)) { + downlink = new StreamingProtocol(peer, process.getOutputStream(), + process.getInputStream()); + } else { + serverSocket.setSoTimeout(2000); + clientSocket = serverSocket.accept(); + downlink = new BinaryProtocol(peer, + clientSocket.getOutputStream(), clientSocket.getInputStream()); + } + downlink.start(); + + } catch (SocketException e) { + throw new SocketException( + "Timout: Client pipes application was not connecting!"); + } + } + + /** + * Get the downward protocol object that can send commands down to the + * application. + * + * @return the downlink proxy + */ + DownwardProtocol getDownlink() { + return downlink; + } + + /** + * Wait for the application to finish + * + * @return did the application finish correctly? + * @throws IOException + * @throws Throwable + */ + boolean waitForFinish() throws InterruptedException, IOException { + downlink.flush(); + return downlink.waitForFinish(); + } + + /** + * Abort the application and wait for it to finish. + * + * @param t the exception that signalled the problem + * @throws IOException A wrapper around the exception that was passed in + */ + void abort(Throwable t) throws IOException { + LOG.info("Aborting because of " + StringUtils.stringifyException(t)); + try { + downlink.abort(); + downlink.flush(); + } catch (IOException e) { + // IGNORE cleanup problems + } + try { + downlink.waitForFinish(); + } catch (Throwable ignored) { + process.destroy(); + } + IOException wrapper = new IOException("pipe child exception"); + wrapper.initCause(t); + throw wrapper; + } + + /** + * Clean up the child procress and socket. + * + * @throws IOException + */ + void cleanup() throws IOException { + serverSocket.close(); + try { + downlink.close(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + + /** + * Run a given command in a subprocess, including threads to copy its stdout + * and stderr to our stdout and stderr. + * + * @param command the command and its arguments + * @param env the environment to run the process in + * @return a handle on the process + * @throws IOException + */ + static Process runClient(List command, Map env) + throws IOException { + ProcessBuilder builder = new ProcessBuilder(command); + if (env != null) { + builder.environment().putAll(env); + } + Process result = builder.start(); + return result; + } + +} Index: core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java (Revision 0) +++ core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java (Arbeitskopie) @@ -0,0 +1,592 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.pipes; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.FileOutputStream; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.bsp.sync.SyncException; +import org.apache.hama.util.KeyValuePair; + +/** + * This protocol is a binary implementation of the Hama Pipes protocol. + * + * Adapted from Hadoop Pipes. + * + */ +public class BinaryProtocol + implements DownwardProtocol { + + protected static final Log LOG = LogFactory.getLog(BinaryProtocol.class + .getName()); + public static final int CURRENT_PROTOCOL_VERSION = 0; + /** + * The buffer size for the command socket + */ + private static final int BUFFER_SIZE = 128 * 1024; + + protected final DataOutputStream stream; + protected final DataOutputBuffer buffer = new DataOutputBuffer(); + + private UplinkReaderThread uplink; + + private boolean hasTask = false; + protected final BSPPeer peer; + + /** + * The integer codes to represent the different messages. These must match the + * C++ codes or massive confusion will result. + */ + protected static enum MessageType { + START(0), SET_BSPJOB_CONF(1), SET_INPUT_TYPES(2), RUN_SETUP(3), RUN_BSP(4), RUN_CLEANUP( + 5), READ_KEYVALUE(6), WRITE_KEYVALUE(7), GET_MSG(8), GET_MSG_COUNT(9), SEND_MSG( + 10), SYNC(11), GET_ALL_PEERNAME(12), GET_PEERNAME(13), GET_PEER_INDEX( + 14), GET_PEER_COUNT(15), GET_SUPERSTEP_COUNT(16), REOPEN_INPUT(17), CLEAR( + 18), CLOSE(19), ABORT(20), DONE(21), TASK_DONE(22), REGISTER_COUNTER(23), INCREMENT_COUNTER( + 24), LOG(25); + + final int code; + + MessageType(int code) { + this.code = code; + } + } + + protected class UplinkReaderThread extends Thread { + + protected DataInputStream inStream; + protected K2 key; + protected V2 value; + protected BSPPeer peer; + + @SuppressWarnings("unchecked") + public UplinkReaderThread(BSPPeer peer, + InputStream stream) throws IOException { + + inStream = new DataInputStream(new BufferedInputStream(stream, + BUFFER_SIZE)); + + this.peer = peer; + this.key = ReflectionUtils.newInstance((Class) peer + .getConfiguration().getClass("bsp.output.key.class", Object.class), + peer.getConfiguration()); + + this.value = ReflectionUtils.newInstance((Class) peer + .getConfiguration().getClass("bsp.output.value.class", Object.class), + peer.getConfiguration()); + } + + public void closeConnection() throws IOException { + inStream.close(); + } + + @Override + public void run() { + while (true) { + try { + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException(); + } + + int cmd = readCommand(); + if (cmd == -1) + continue; + LOG.debug("Handling uplink command " + cmd); + + if (cmd == MessageType.WRITE_KEYVALUE.code) { // INCOMING + writeKeyValue(); + } else if (cmd == MessageType.READ_KEYVALUE.code) { // OUTGOING + readKeyValue(); + } else if (cmd == MessageType.INCREMENT_COUNTER.code) { // INCOMING + incrementCounter(); + } else if (cmd == MessageType.REGISTER_COUNTER.code) { // INCOMING + /* + * Is not used in HAMA -> Hadoop Pipes - maybe for performance, skip + * transferring group and name each INCREMENT + */ + } else if (cmd == MessageType.TASK_DONE.code) { // INCOMING + LOG.debug("Got MessageType.TASK_DONE"); + hasTask = false; + } else if (cmd == MessageType.DONE.code) { // INCOMING + LOG.debug("Pipe child done"); + return; + } else if (cmd == MessageType.SEND_MSG.code) { // INCOMING + sendMessage(); + } else if (cmd == MessageType.GET_MSG_COUNT.code) { // OUTGOING + getMessageCount(); + } else if (cmd == MessageType.GET_MSG.code) { // OUTGOING + getMessage(); + } else if (cmd == MessageType.SYNC.code) { // INCOMING + sync(); + } else if (cmd == MessageType.GET_ALL_PEERNAME.code) { // OUTGOING + getAllPeerNames(); + } else if (cmd == MessageType.GET_PEERNAME.code) { // OUTGOING + getPeerName(); + } else if (cmd == MessageType.GET_PEER_INDEX.code) { // OUTGOING + getPeerIndex(); + } else if (cmd == MessageType.GET_PEER_COUNT.code) { // OUTGOING + getPeerCount(); + } else if (cmd == MessageType.GET_SUPERSTEP_COUNT.code) { // OUTGOING + getSuperstepCount(); + } else if (cmd == MessageType.REOPEN_INPUT.code) { // INCOMING + reopenInput(); + } else if (cmd == MessageType.CLEAR.code) { // INCOMING + LOG.debug("Got MessageType.CLEAR"); + peer.clear(); + } else { + throw new IOException("Bad command code: " + cmd); + } + } catch (InterruptedException e) { + return; + } catch (Throwable e) { + onError(e); + throw new RuntimeException(e); + } + } + } + + protected void onError(Throwable e) { + LOG.error(StringUtils.stringifyException(e)); + } + + public int readCommand() throws IOException { + return WritableUtils.readVInt(inStream); + } + + public void reopenInput() throws IOException { + LOG.debug("Got MessageType.REOPEN_INPUT"); + peer.reopenInput(); + } + + public void getSuperstepCount() throws IOException { + WritableUtils.writeVInt(stream, MessageType.GET_SUPERSTEP_COUNT.code); + WritableUtils.writeVLong(stream, peer.getSuperstepCount()); + flush(); + LOG.debug("Responded MessageType.GET_SUPERSTEP_COUNT - SuperstepCount: " + + peer.getSuperstepCount()); + } + + public void getPeerCount() throws IOException { + WritableUtils.writeVInt(stream, MessageType.GET_PEER_COUNT.code); + WritableUtils.writeVInt(stream, peer.getNumPeers()); + flush(); + LOG.debug("Responded MessageType.GET_PEER_COUNT - NumPeers: " + + peer.getNumPeers()); + } + + public void getPeerIndex() throws IOException { + WritableUtils.writeVInt(stream, MessageType.GET_PEER_INDEX.code); + WritableUtils.writeVInt(stream, peer.getPeerIndex()); + flush(); + LOG.debug("Responded MessageType.GET_PEER_INDEX - PeerIndex: " + + peer.getPeerIndex()); + } + + public void getPeerName() throws IOException { + int id = readCommand(); + LOG.debug("Got MessageType.GET_PEERNAME id: " + id); + + WritableUtils.writeVInt(stream, MessageType.GET_PEERNAME.code); + if (id == -1) { // -1 indicates get own PeerName + Text.writeString(stream, peer.getPeerName()); + LOG.debug("Responded MessageType.GET_PEERNAME - Get Own PeerName: " + + peer.getPeerName()); + + } else if ((id < -1) || (id >= peer.getNumPeers())) { + // if no PeerName for this index is found write emptyString + Text.writeString(stream, ""); + LOG.debug("Responded MessageType.GET_PEERNAME - Empty PeerName!"); + + } else { + Text.writeString(stream, peer.getPeerName(id)); + LOG.debug("Responded MessageType.GET_PEERNAME - PeerName: " + + peer.getPeerName(id)); + } + flush(); + } + + public void getAllPeerNames() throws IOException { + LOG.debug("Got MessageType.GET_ALL_PEERNAME"); + WritableUtils.writeVInt(stream, MessageType.GET_ALL_PEERNAME.code); + WritableUtils.writeVInt(stream, peer.getAllPeerNames().length); + for (String s : peer.getAllPeerNames()) + Text.writeString(stream, s); + + flush(); + LOG.debug("Responded MessageType.GET_ALL_PEERNAME - peerNamesCount: " + + peer.getAllPeerNames().length); + } + + public void sync() throws IOException, SyncException, InterruptedException { + LOG.debug("Got MessageType.SYNC"); + peer.sync(); // this call blocks + } + + public void getMessage() throws IOException { + LOG.debug("Got MessageType.GET_MSG"); + WritableUtils.writeVInt(stream, MessageType.GET_MSG.code); + BytesWritable msg = peer.getCurrentMessage(); + if (msg != null) + writeObject(msg); + + flush(); + LOG.debug("Responded MessageType.GET_MSG - Message(BytesWritable) ");// +msg); + } + + public void getMessageCount() throws IOException { + WritableUtils.writeVInt(stream, MessageType.GET_MSG_COUNT.code); + WritableUtils.writeVInt(stream, peer.getNumCurrentMessages()); + flush(); + LOG.debug("Responded MessageType.GET_MSG_COUNT - Count: " + + peer.getNumCurrentMessages()); + } + + public void sendMessage() throws IOException { + String peerName = Text.readString(inStream); + BytesWritable msg = new BytesWritable(); + readObject(msg); + LOG.debug("Got MessageType.SEND_MSG to peerName: " + peerName); + peer.send(peerName, msg); + } + + public void incrementCounter() throws IOException { + // int id = WritableUtils.readVInt(inStream); + String group = Text.readString(inStream); + String name = Text.readString(inStream); + long amount = WritableUtils.readVLong(inStream); + peer.incrementCounter(name, group, amount); + } + + public void readKeyValue() throws IOException { + boolean nullinput = peer.getConfiguration().get("bsp.input.format.class") == null + || peer.getConfiguration().get("bsp.input.format.class") + .equals("org.apache.hama.bsp.NullInputFormat"); + + if (!nullinput) { + + KeyValuePair pair = peer.readNext(); + + WritableUtils.writeVInt(stream, MessageType.READ_KEYVALUE.code); + if (pair != null) { + writeObject(pair.getKey()); + writeObject(pair.getValue()); + + LOG.debug("Responded MessageType.READ_KEYVALUE - Key: " + + pair.getKey() + " Value: " + pair.getValue()); + + } else { + Text.writeString(stream, ""); + Text.writeString(stream, ""); + LOG.debug("Responded MessageType.READ_KEYVALUE - EMPTY KeyValue Pair"); + } + flush(); + + } else { + /* TODO */ + /* Send empty Strings to show no KeyValue pair is available */ + WritableUtils.writeVInt(stream, MessageType.READ_KEYVALUE.code); + Text.writeString(stream, ""); + Text.writeString(stream, ""); + flush(); + LOG.debug("Responded MessageType.READ_KEYVALUE - EMPTY KeyValue Pair"); + } + } + + public void writeKeyValue() throws IOException { + readObject(key); // string or binary only + readObject(value); // string or binary only + if (LOG.isDebugEnabled()) + LOG.debug("Got MessageType.WRITE_KEYVALUE - Key: " + key + " Value: " + + value); + peer.write(key, value); + } + + protected void readObject(Writable obj) throws IOException { + int numBytes = readCommand(); + byte[] buffer; + // For BytesWritable and Text, use the specified length to set the length + // this causes the "obvious" translations to work. So that if you emit + // a string "abc" from C++, it shows up as "abc". + if (obj instanceof BytesWritable) { + buffer = new byte[numBytes]; + inStream.readFully(buffer); + ((BytesWritable) obj).set(buffer, 0, numBytes); + } else if (obj instanceof Text) { + buffer = new byte[numBytes]; + inStream.readFully(buffer); + ((Text) obj).set(buffer); + } else if (obj instanceof NullWritable) { + throw new IOException( + "Cannot read data into NullWritable! Check OutputClasses!"); + } else { + /* TODO */ + /* IntWritable, DoubleWritable */ + throw new IOException( + "Hama Pipes does only support Text as Key/Value output!"); + // obj.readFields(inStream); + } + } + } + + /** + * An output stream that will save a copy of the data into a file. + */ + private static class TeeOutputStream extends FilterOutputStream { + private OutputStream file; + + TeeOutputStream(String filename, OutputStream base) throws IOException { + super(base); + file = new FileOutputStream(filename); + } + + @Override + public void write(byte b[], int off, int len) throws IOException { + file.write(b, off, len); + out.write(b, off, len); + } + + @Override + public void write(int b) throws IOException { + file.write(b); + out.write(b); + } + + @Override + public void flush() throws IOException { + file.flush(); + out.flush(); + } + + @Override + public void close() throws IOException { + flush(); + file.close(); + out.close(); + } + } + + /** + * Create a proxy object that will speak the binary protocol on a socket. + * Upward messages are passed on the specified handler and downward downward + * messages are public methods on this object. + * + * @param sock The socket to communicate on. + * @param handler The handler for the received messages. + * @param key The object to read keys into. + * @param value The object to read values into. + * @param jobConfig The job's configuration + * @throws IOException + */ + public BinaryProtocol(BSPPeer peer, + OutputStream out, InputStream in) throws IOException { + this.peer = peer; + OutputStream raw = out; + + // If we are debugging, save a copy of the downlink commands to a file + if (Submitter.getKeepCommandFile(peer.getConfiguration())) { + raw = new TeeOutputStream("downlink.data", raw); + } + stream = new DataOutputStream(new BufferedOutputStream(raw, BUFFER_SIZE)); + uplink = getUplinkReader(peer, in); + + uplink.setName("pipe-uplink-handler"); + uplink.start(); + } + + public UplinkReaderThread getUplinkReader( + BSPPeer peer, InputStream sock) + throws IOException { + return new UplinkReaderThread(peer, sock); + } + + @Override + public boolean waitForFinish() throws IOException, InterruptedException { + // LOG.debug("waitForFinish... "+hasTask); + while (hasTask) { + try { + Thread.sleep(100); + // LOG.debug("waitForFinish... "+hasTask); + } catch (Exception e) { + LOG.error(e); + } + } + return hasTask; + } + + /** + * Close the connection and shutdown the handler thread. + * + * @throws IOException + * @throws InterruptedException + */ + @Override + public void close() throws IOException, InterruptedException { + // runCleanup(pipedInput,pipedOutput); + LOG.debug("closing connection"); + endOfInput(); + + uplink.interrupt(); + uplink.join(); + + uplink.closeConnection(); + stream.close(); + } + + @Override + public void start() throws IOException { + LOG.debug("starting downlink"); + WritableUtils.writeVInt(stream, MessageType.START.code); + WritableUtils.writeVInt(stream, CURRENT_PROTOCOL_VERSION); + flush(); + LOG.debug("Sent MessageType.START"); + setBSPJob(peer.getConfiguration()); + } + + public void setBSPJob(Configuration conf) throws IOException { + WritableUtils.writeVInt(stream, MessageType.SET_BSPJOB_CONF.code); + List list = new ArrayList(); + for (Map.Entry itm : conf) { + list.add(itm.getKey()); + list.add(itm.getValue()); + } + WritableUtils.writeVInt(stream, list.size()); + for (String entry : list) { + Text.writeString(stream, entry); + } + flush(); + LOG.debug("Sent MessageType.SET_BSPJOB_CONF"); + } + + @Override + public void setInputTypes(String keyType, String valueType) + throws IOException { + WritableUtils.writeVInt(stream, MessageType.SET_INPUT_TYPES.code); + Text.writeString(stream, keyType); + Text.writeString(stream, valueType); + flush(); + LOG.debug("Sent MessageType.SET_INPUT_TYPES"); + } + + @Override + public void runSetup(boolean pipedInput, boolean pipedOutput) + throws IOException { + + WritableUtils.writeVInt(stream, MessageType.RUN_SETUP.code); + WritableUtils.writeVInt(stream, pipedInput ? 1 : 0); + WritableUtils.writeVInt(stream, pipedOutput ? 1 : 0); + flush(); + hasTask = true; + LOG.debug("Sent MessageType.RUN_SETUP"); + } + + @Override + public void runBsp(boolean pipedInput, boolean pipedOutput) + throws IOException { + + WritableUtils.writeVInt(stream, MessageType.RUN_BSP.code); + WritableUtils.writeVInt(stream, pipedInput ? 1 : 0); + WritableUtils.writeVInt(stream, pipedOutput ? 1 : 0); + flush(); + hasTask = true; + LOG.debug("Sent MessageType.RUN_BSP"); + } + + @Override + public void runCleanup(boolean pipedInput, boolean pipedOutput) + throws IOException { + + WritableUtils.writeVInt(stream, MessageType.RUN_CLEANUP.code); + WritableUtils.writeVInt(stream, pipedInput ? 1 : 0); + WritableUtils.writeVInt(stream, pipedOutput ? 1 : 0); + flush(); + hasTask = true; + LOG.debug("Sent MessageType.RUN_CLEANUP"); + } + + public void endOfInput() throws IOException { + WritableUtils.writeVInt(stream, MessageType.CLOSE.code); + flush(); + LOG.debug("Sent close command"); + LOG.debug("Sent MessageType.CLOSE"); + } + + @Override + public void abort() throws IOException { + WritableUtils.writeVInt(stream, MessageType.ABORT.code); + flush(); + LOG.debug("Sent MessageType.ABORT"); + } + + @Override + public void flush() throws IOException { + stream.flush(); + } + + /** + * Write the given object to the stream. If it is a Text or BytesWritable, + * write it directly. Otherwise, write it to a buffer and then write the + * length and data to the stream. + * + * @param obj the object to write + * @throws IOException + */ + protected void writeObject(Writable obj) throws IOException { + // For Text and BytesWritable, encode them directly, so that they end up + // in C++ as the natural translations. + if (obj instanceof Text) { + Text t = (Text) obj; + int len = t.getLength(); + WritableUtils.writeVInt(stream, len); + stream.write(t.getBytes(), 0, len); + } else if (obj instanceof BytesWritable) { + BytesWritable b = (BytesWritable) obj; + int len = b.getLength(); + WritableUtils.writeVInt(stream, len); + stream.write(b.getBytes(), 0, len); + } else { + buffer.reset(); + obj.write(buffer); + int length = buffer.getLength(); + WritableUtils.writeVInt(stream, length); + stream.write(buffer.getData(), 0, length); + } + } +} Index: core/src/main/java/org/apache/hama/pipes/DownwardProtocol.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/DownwardProtocol.java (Revision 0) +++ core/src/main/java/org/apache/hama/pipes/DownwardProtocol.java (Arbeitskopie) @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.pipes; + +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +/** + * The abstract description of the downward (from Java to C++) Pipes protocol. + * All of these calls are asynchronous and return before the message has been + * processed. + * + * Adapted from Hadoop Pipes. + * + */ +public interface DownwardProtocol { + + /** + * Start communication + * + * @throws IOException + */ + void start() throws IOException; + + /** + * Set the input types for Maps. + * + * @param keyType the name of the key's type + * @param valueType the name of the value's type + * @throws IOException + */ + void setInputTypes(String keyType, String valueType) throws IOException; + + void runBsp(boolean pipedInput, boolean pipedOutput) throws IOException; + + void runCleanup(boolean pipedInput, boolean pipedOutput) throws IOException; + + void runSetup(boolean pipedInput, boolean pipedOutput) throws IOException; + + /** + * The task should stop as soon as possible, because something has gone wrong. + * + * @throws IOException + */ + void abort() throws IOException; + + /** + * Flush the data through any buffers. + */ + void flush() throws IOException; + + /** + * Close the connection. + */ + void close() throws IOException, InterruptedException; + + boolean waitForFinish() throws IOException, InterruptedException; +} Index: core/src/main/java/org/apache/hama/pipes/StreamingProtocol.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/StreamingProtocol.java (Revision 0) +++ core/src/main/java/org/apache/hama/pipes/StreamingProtocol.java (Arbeitskopie) @@ -0,0 +1,276 @@ +package org.apache.hama.pipes; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.bsp.sync.SyncException; +import org.apache.hama.util.KeyValuePair; + +/** + * Streaming protocol that inherits from the binary protocol. Basically it + * writes everything as text to the peer, each command is separated by newlines. + * To distinguish op-codes (like SET_BSPJOB_CONF) from normal output, we use the + * surrounds %OP_CODE%=_possible_value. + * + * @param input key. + * @param input value. + * @param output key. + * @param output value. + */ +public class StreamingProtocol + extends BinaryProtocol { + + private static final Pattern PROTOCOL_STRING_PATTERN = Pattern.compile("="); + + private final CyclicBarrier ackBarrier = new CyclicBarrier(2); + private boolean brokenBarrier = false; + + public StreamingProtocol(BSPPeer peer, + OutputStream out, InputStream in) throws IOException { + super(peer, out, in); + } + + public class StreamingUplinkReaderThread extends UplinkReaderThread { + + private BufferedReader reader; + + public StreamingUplinkReaderThread( + BSPPeer peer, InputStream stream) + throws IOException { + super(peer, stream); + reader = new BufferedReader(new InputStreamReader(inStream)); + } + + @Override + public void sendMessage() throws IOException { + String peerLine = reader.readLine(); + String msgLine = reader.readLine(); + peer.send(peerLine, new BytesWritable(msgLine.getBytes())); + } + + @Override + public void getMessage() throws IOException { + BytesWritable currentMessage = peer.getCurrentMessage(); + if (currentMessage != null) + writeLine(new String(currentMessage.getBytes())); + else + writeLine("%%-1%%"); + } + + @Override + public void getMessageCount() throws IOException { + writeLine("" + peer.getNumCurrentMessages()); + } + + @Override + public void getSuperstepCount() throws IOException { + writeLine("" + peer.getSuperstepCount()); + } + + @Override + public void getPeerName() throws IOException { + int id = Integer.parseInt(reader.readLine()); + if (id == -1) + writeLine(peer.getPeerName()); + else + writeLine(peer.getPeerName(id)); + } + + @Override + public void getPeerIndex() throws IOException { + writeLine("" + peer.getPeerIndex()); + } + + @Override + public void getAllPeerNames() throws IOException { + writeLine("" + peer.getAllPeerNames().length); + for (String s : peer.getAllPeerNames()) { + writeLine(s); + } + } + + @Override + public void getPeerCount() throws IOException { + writeLine("" + peer.getAllPeerNames().length); + } + + @Override + public void sync() throws IOException, SyncException, InterruptedException { + peer.sync(); + writeLine(getProtocolString(MessageType.SYNC) + "_SUCCESS"); + } + + @Override + public void writeKeyValue() throws IOException { + String key = reader.readLine(); + String value = reader.readLine(); + peer.write(new Text(key), new Text(value)); + } + + @Override + public void readKeyValue() throws IOException { + KeyValuePair readNext = peer.readNext(); + if (readNext == null) { + writeLine("%%-1%%"); + writeLine("%%-1%%"); + } else { + writeLine(readNext.getKey() + ""); + writeLine(readNext.getValue() + ""); + } + } + + @Override + public void reopenInput() throws IOException { + peer.reopenInput(); + } + + @Override + public int readCommand() throws IOException { + String readLine = reader.readLine(); + if (readLine != null && !readLine.isEmpty()) { + String[] split = PROTOCOL_STRING_PATTERN.split(readLine, 2); + split[0] = split[0].replace("%", ""); + if (checkAcks(split)) + return -1; + try { + int parseInt = Integer.parseInt(split[0]); + if (parseInt == BinaryProtocol.MessageType.LOG.code) { + LOG.info(split[1]); + return -1; + } + return parseInt; + } catch (NumberFormatException e) { + e.printStackTrace(); + } + } else { + return -1; + } + return -2; + } + + @Override + protected void onError(Throwable e) { + super.onError(e); + // break the barrier if we had an error + ackBarrier.reset(); + brokenBarrier = true; + } + + private boolean checkAcks(String[] readLine) { + if (readLine[0].startsWith("ACK_")) { + try { + ackBarrier.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + return true; + } + return false; + } + + } + + @Override + public void start() throws IOException { + writeLine(MessageType.START, null); + writeLine("" + CURRENT_PROTOCOL_VERSION); + setBSPJob(peer.getConfiguration()); + try { + ackBarrier.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + } + + @Override + public void setBSPJob(Configuration conf) throws IOException { + writeLine(MessageType.SET_BSPJOB_CONF, null); + List list = new ArrayList(); + for (Map.Entry itm : conf) { + list.add(itm.getKey()); + list.add(itm.getValue()); + } + writeLine(list.size()); + for (String entry : list) { + writeLine(entry); + } + flush(); + } + + @Override + public void runSetup(boolean pipedInput, boolean pipedOutput) + throws IOException { + writeLine(MessageType.RUN_SETUP, null); + waitOnAck(); + } + + @Override + public void runBsp(boolean pipedInput, boolean pipedOutput) + throws IOException { + writeLine(MessageType.RUN_BSP, null); + waitOnAck(); + } + + public void waitOnAck() { + try { + if (!brokenBarrier) + ackBarrier.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + } + + @Override + public void runCleanup(boolean pipedInput, boolean pipedOutput) + throws IOException { + writeLine(MessageType.RUN_CLEANUP, null); + waitOnAck(); + } + + @Override + public UplinkReaderThread getUplinkReader( + BSPPeer peer, InputStream in) + throws IOException { + return new StreamingUplinkReaderThread(peer, in); + } + + public void writeLine(int msg) throws IOException { + writeLine("" + msg); + } + + public void writeLine(String msg) throws IOException { + stream.write((msg + "\n").getBytes()); + stream.flush(); + } + + public void writeLine(MessageType type, String msg) throws IOException { + stream.write((getProtocolString(type) + (msg == null ? "" : msg) + "\n") + .getBytes()); + stream.flush(); + } + + public String getProtocolString(MessageType type) { + return "%" + type.code + "%="; + } + +} Index: core/src/main/java/org/apache/hama/pipes/Submitter.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/Submitter.java (Revision 0) +++ core/src/main/java/org/apache/hama/pipes/Submitter.java (Arbeitskopie) @@ -0,0 +1,584 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** MODIFIED FOR GPGPU Usage! **/ +//BSPJob <--> JobConf + +package org.apache.hama.pipes; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLClassLoader; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Arrays; +import java.util.StringTokenizer; + +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.Parser; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.BSP; +import org.apache.hama.bsp.BSPJob; +import org.apache.hama.bsp.BSPJobClient; +import org.apache.hama.bsp.FileInputFormat; +import org.apache.hama.bsp.FileOutputFormat; +import org.apache.hama.bsp.HashPartitioner; +import org.apache.hama.bsp.InputFormat; +import org.apache.hama.bsp.OutputFormat; +import org.apache.hama.bsp.Partitioner; + +/** + * The main entry point and job submitter. It may either be used as a command + * line-based or API-based method to launch Pipes jobs. + * + * Adapted from Hadoop Pipes. + * + */ +public class Submitter implements Tool { + + protected static final Log LOG = LogFactory.getLog(Submitter.class); + private HamaConfiguration conf; + + public Submitter() { + this.conf = new HamaConfiguration(); + } + + public Submitter(HamaConfiguration conf) { + setConf(conf); + } + + @Override + public HamaConfiguration getConf() { + return this.conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = (HamaConfiguration) conf; + } + + /** + * Get the URI of the CPU application's executable. + * + * @param conf + * @return the URI where the application's executable is located + */ + public static String getExecutable(Configuration conf) { + return conf.get("hama.pipes.executable"); + } + + /** + * Set the URI for the CPU application's executable. Normally this is a hdfs: + * location. + * + * @param conf + * @param executable The URI of the application's executable. + */ + public static void setExecutable(Configuration conf, String executable) { + conf.set("hama.pipes.executable", executable); + } + + public static String getCPUExecutable(Configuration conf) { + return getExecutable(conf); + } + + public static void setCPUExecutable(Configuration conf, String executable) { + setExecutable(conf, executable); + } + + /** + * Set the URI for the GPU application's executable. Normally this is a hdfs: + * location. + * + * @param conf + * @param executable The URI of the application's executable. + */ + public static void setGPUExecutable(Configuration conf, String executable) { + conf.set("hama.pipes.gpu.executable", executable); + } + + /** + * Get the URI of the GPU application's executable. + * + * @param conf + * @return the URI where the application's executable is located + */ + public static String getGPUExecutable(Configuration conf) { + return conf.get("hama.pipes.gpu.executable"); + } + + /** + * Set whether the BSP is written in Java. + * + * @param conf the configuration to modify + * @param value the new value + */ + public static void setIsJavaBSP(Configuration conf, boolean value) { + conf.setBoolean("hama.pipes.java.bsp", value); + } + + /** + * Check whether the job is using a Java BSP. + * + * @param conf the configuration to check + * @return is it a Java BSP? + */ + public static boolean getIsJavaBSP(Configuration conf) { + return conf.getBoolean("hama.pipes.java.bsp", false); + } + + /** + * Set whether the job is using a Java RecordReader. + * + * @param conf the configuration to modify + * @param value the new value + */ + public static void setIsJavaRecordReader(Configuration conf, boolean value) { + conf.setBoolean("hama.pipes.java.recordreader", value); + } + + /** + * Check whether the job is using a Java RecordReader + * + * @param conf the configuration to check + * @return is it a Java RecordReader? + */ + public static boolean getIsJavaRecordReader(Configuration conf) { + return conf.getBoolean("hama.pipes.java.recordreader", false); + } + + /** + * Set whether the job will use a Java RecordWriter. + * + * @param conf the configuration to modify + * @param value the new value to set + */ + public static void setIsJavaRecordWriter(Configuration conf, boolean value) { + conf.setBoolean("hama.pipes.java.recordwriter", value); + } + + /** + * Will the reduce use a Java RecordWriter? + * + * @param conf the configuration to check + * @return true, if the output of the job will be written by Java + */ + public static boolean getIsJavaRecordWriter(Configuration conf) { + return conf.getBoolean("hama.pipes.java.recordwriter", false); + } + + /** + * Set the configuration, if it doesn't already have a value for the given + * key. + * + * @param conf the configuration to modify + * @param key the key to set + * @param value the new "default" value to set + */ + private static void setIfUnset(Configuration conf, String key, String value) { + if (conf.get(key) == null) { + conf.set(key, value); + } + } + + /** + * Save away the user's original partitioner before we override it. + * + * @param conf the configuration to modify + * @param cls the user's partitioner class + */ + static void setJavaPartitioner(Configuration conf, Class cls) { + conf.set("hama.pipes.partitioner", cls.getName()); + } + + /** + * Get the user's original partitioner. + * + * @param conf the configuration to look in + * @return the class that the user submitted + */ + @SuppressWarnings("rawtypes") + static Class getJavaPartitioner(Configuration conf) { + return conf.getClass("hama.pipes.partitioner", HashPartitioner.class, + Partitioner.class); + } + + /** + * Does the user want to keep the command file for debugging? If this is true, + * pipes will write a copy of the command data to a file in the task directory + * named "downlink.data", which may be used to run the C++ program under the + * debugger. You probably also want to set + * JobConf.setKeepFailedTaskFiles(true) to keep the entire directory from + * being deleted. To run using the data file, set the environment variable + * "hadoop.pipes.command.file" to point to the file. + * + * @param conf the configuration to check + * @return will the framework save the command file? + */ + public static boolean getKeepCommandFile(Configuration conf) { + return conf.getBoolean("hama.pipes.command-file.keep", false); + } + + /** + * Set whether to keep the command file for debugging + * + * @param conf the configuration to modify + * @param keep the new value + */ + public static void setKeepCommandFile(Configuration conf, boolean keep) { + conf.setBoolean("hama.pipes.command-file.keep", keep); + } + + /** + * Submit a job to the cluster. All of the necessary modifications to the job + * to run under pipes are made to the configuration. + * + * @param conf the job to submit to the cluster (MODIFIED) + * @throws IOException + */ + public static void runJob(BSPJob job) throws IOException { + setupPipesJob(job); + BSPJobClient.runJob(job); + } + + private static void setupPipesJob(BSPJob job) throws IOException { + // default map output types to Text + if (!getIsJavaBSP(job.getConf())) { + job.setBspClass(PipesBSP.class); + job.setJarByClass(PipesBSP.class); + + // Save the user's partitioner and hook in our's. + // setJavaPartitioner(job, job.getPartitionerClass()); + // job.setPartitionerClass(PipesPartitioner.class); + } + /* + * if (!getIsJavaReducer(conf)) { conf.setReducerClass(PipesReducer.class); + * if (!getIsJavaRecordWriter(conf)) { + * conf.setOutputFormat(NullOutputFormat.class); } } + */ + + String textClassname = Text.class.getName(); + setIfUnset(job.getConf(), "bsp.input.key.class", textClassname); + setIfUnset(job.getConf(), "bsp.input.value.class", textClassname); + setIfUnset(job.getConf(), "bsp.output.key.class", textClassname); + setIfUnset(job.getConf(), "bsp.output.value.class", textClassname); + + // TODO Set default Job name + setIfUnset(job.getConf(), "bsp.job.name", "Hama Pipes Job"); + + LOG.info("DEBUG: isJavaRecordReader: " + + getIsJavaRecordReader(job.getConf())); + LOG.info("DEBUG: BspClass: " + job.getBspClass().getName()); + // conf.setInputFormat(NLineInputFormat.class); + LOG.info("DEBUG: InputFormat: " + job.getInputFormat()); + LOG.info("DEBUG: InputKeyClass: " + job.getInputKeyClass().getName()); + LOG.info("DEBUG: InputValueClass: " + job.getInputValueClass().getName()); + LOG.info("DEBUG: OutputKeyClass: " + job.getOutputKeyClass().getName()); + LOG.info("DEBUG: OutputValueClass: " + job.getOutputValueClass().getName()); + + if ((!job.getOutputKeyClass().getName().equals(textClassname)) + || (!job.getOutputValueClass().getName().equals(textClassname))) + throw new IllegalArgumentException( + "Hama Pipes does only support Text as Key/Value output!"); + + // Use PipesNonJavaInputFormat if necessary to handle progress reporting + // from C++ RecordReaders ... + /* + * if (!getIsJavaRecordReader(job) && !getIsJavaMapper(job)) { + * job.setClass("mapred.pipes.user.inputformat", + * job.getInputFormat().getClass(), InputFormat.class); + * job.setInputFormat(PipesNonJavaInputFormat.class); } + */ + + LOG.info("DEBUG: bsp.master.address: " + + job.getConf().get("bsp.master.address")); + LOG.info("DEBUG: bsp.local.tasks.maximum: " + + job.getConf().get("bsp.local.tasks.maximum")); + LOG.info("DEBUG: NumBspTask: " + job.getNumBspTask()); + LOG.info("DEBUG: fs.default.name: " + job.getConf().get("fs.default.name")); + + // String exec = getExecutable(conf); + String cpubin = getCPUExecutable(job.getConf()); + String gpubin = getGPUExecutable(job.getConf()); + LOG.info("DEBUG: CPUbin = '" + cpubin + "'"); + LOG.info("DEBUG: GPUbin = '" + gpubin + "'"); + // if (exec == null) { + if (cpubin == null && gpubin == null) { + throw new IllegalArgumentException("No CPU or GPU application defined."); + } + // add default debug script only when executable is expressed as + // # + // if (exec.contains("#")) { + /* + * if (cpubin!=null && cpubin.contains("#") || gpubin!=null && + * gpubin.contains("#")) { DistributedCache.createSymlink(job.getConf()); // + * set default gdb commands for map and reduce task String defScript = + * "$HADOOP_HOME/src/c++/pipes/debug/pipes-default-script"; + * setIfUnset(job,"mapred.map.task.debug.script",defScript); + * setIfUnset(job,"mapred.reduce.task.debug.script",defScript); } + */ + + URI[] fileCache = DistributedCache.getCacheFiles(job.getConf()); + int count = ((cpubin != null) && (gpubin != null)) ? 2 : 1; + if (fileCache == null) { + fileCache = new URI[count]; + } else { + URI[] tmp = new URI[fileCache.length + count]; + System.arraycopy(fileCache, 0, tmp, count, fileCache.length); + fileCache = tmp; + } + + if (cpubin != null) { + try { + fileCache[0] = new URI(cpubin); + } catch (URISyntaxException e) { + IOException ie = new IOException("Problem parsing execable URI " + + cpubin); + ie.initCause(e); + throw ie; + } + } + if (gpubin != null) { + try { + fileCache[1] = new URI(gpubin); + } catch (URISyntaxException e) { + IOException ie = new IOException("Problem parsing execable URI " + + gpubin); + ie.initCause(e); + throw ie; + } + } + DistributedCache.setCacheFiles(fileCache, job.getConf()); + } + + /** + * A command line parser for the CLI-based Pipes job submitter. + */ + static class CommandLineParser { + private Options options = new Options(); + + void addOption(String longName, boolean required, String description, + String paramName) { + OptionBuilder.withArgName(paramName); + OptionBuilder.hasArgs(1); + OptionBuilder.withDescription(description); + OptionBuilder.isRequired(required); + Option option = OptionBuilder.create(longName); + options.addOption(option); + } + + void addArgument(String name, boolean required, String description) { + OptionBuilder.withArgName(name); + OptionBuilder.hasArgs(1); + OptionBuilder.withDescription(description); + OptionBuilder.isRequired(required); + Option option = OptionBuilder.create(); + options.addOption(option); + + } + + Parser createParser() { + Parser result = new BasicParser(); + return result; + } + + void printUsage() { + // The CLI package should do this for us, but I can't figure out how + // to make it print something reasonable. + System.out.println("bin/hama pipes"); + System.out.println(" [-input ] // Input directory"); + System.out.println(" [-output ] // Output directory"); + System.out.println(" [-jar // jar filename"); + System.out.println(" [-inputformat ] // InputFormat class"); + System.out.println(" [-bsp ] // Java Map class"); + System.out.println(" [-partitioner ] // Java Partitioner"); + System.out.println(" [-combiner ] // Java Combiner class"); + System.out.println(" [-output ] // Java RecordWriter"); + System.out.println(" [-program ] // executable URI"); + System.out + .println(" [-cpubin ] //URI to application cpu executable"); + System.out + .println(" [-gpubin ] //URI to application gpu executable"); + System.out.println(); + GenericOptionsParser.printGenericCommandUsage(System.out); + } + } + + private static Class getClass( + CommandLine cl, String key, HamaConfiguration conf, + Class cls) throws ClassNotFoundException { + + return conf.getClassByName(cl.getOptionValue(key)).asSubclass(cls); + } + + @Override + public int run(String[] args) throws Exception { + CommandLineParser cli = new CommandLineParser(); + if (args.length == 0) { + cli.printUsage(); + return 1; + } + + LOG.info("DEBUG: Hama pipes Submitter started!"); + + cli.addOption("input", false, "input path for bsp", "path"); + cli.addOption("output", false, "output path from bsp", "path"); + + cli.addOption("jar", false, "job jar file", "path"); + cli.addOption("inputformat", false, "java classname of InputFormat", + "class"); + // cli.addArgument("javareader", false, "is the RecordReader in Java"); + + cli.addOption("bsp", false, "java classname of bsp", "class"); + cli.addOption("partitioner", false, "java classname of Partitioner", + "class"); + cli.addOption("outputformat", false, "java classname of OutputFormat", + "class"); + + cli.addOption( + "jobconf", + false, + "\"n1=v1,n2=v2,..\" (Deprecated) Optional. Add or override a JobConf property.", + "key=val"); + + cli.addOption("program", false, "URI to application executable", "class"); + cli.addOption("cpubin", false, "URI to application cpu executable", "class"); + cli.addOption("gpubin", false, "URI to application gpu executable", "class"); + Parser parser = cli.createParser(); + try { + + // check generic arguments -conf + LOG.debug("DEBUG: execute GenericOptionsParser"); + GenericOptionsParser genericParser = new GenericOptionsParser(getConf(), + args); + // get other arguments + CommandLine results = parser.parse(cli.options, + genericParser.getRemainingArgs()); + LOG.debug("DEBUG: NormalArguments: " + Arrays.toString(results.getArgs())); + + BSPJob job = new BSPJob(getConf()); + + if (results.hasOption("input")) { + FileInputFormat.setInputPaths(job, results.getOptionValue("input")); + } + if (results.hasOption("output")) { + FileOutputFormat.setOutputPath(job, + new Path(results.getOptionValue("output"))); + } + if (results.hasOption("jar")) { + job.setJar(results.getOptionValue("jar")); + } + + if (results.hasOption("inputformat")) { + setIsJavaRecordReader(job.getConf(), true); + job.setInputFormat(getClass(results, "inputformat", conf, + InputFormat.class)); + } + if (results.hasOption("outputformat")) { + setIsJavaRecordWriter(job.getConf(), true); + job.setOutputFormat(getClass(results, "outputformat", conf, + OutputFormat.class)); + } + + if (results.hasOption("bsp")) { + setIsJavaBSP(job.getConf(), true); + job.setBspClass(getClass(results, "bsp", conf, BSP.class)); + } + /* + * if (results.hasOption("partitioner")) { + * job.setPartitionerClass(getClass(results, "partitioner", conf, + * Partitioner.class)); } if (results.hasOption("combiner")) { + * //setIsJavaReducer(job, true); job.setCombinerClass(getClass(results, + * "combiner", conf, Combiner.class)); } + */ + + if (results.hasOption("jobconf")) { + LOG.warn("-jobconf option is deprecated, please use -D instead."); + String options = results.getOptionValue("jobconf"); + StringTokenizer tokenizer = new StringTokenizer(options, ","); + while (tokenizer.hasMoreTokens()) { + String keyVal = tokenizer.nextToken().trim(); + String[] keyValSplit = keyVal.split("=", 2); + job.set(keyValSplit[0], keyValSplit[1]); + } + } + + if (results.hasOption("program")) { + setExecutable(job.getConf(), results.getOptionValue("program")); + } + if (results.hasOption("cpubin")) { + setCPUExecutable(job.getConf(), results.getOptionValue("cpubin")); + } + if (results.hasOption("gpubin")) { + setGPUExecutable(job.getConf(), results.getOptionValue("gpubin")); + } + + // if they gave us a jar file, include it into the class path + String jarFile = job.getJar(); + if (jarFile != null) { + @SuppressWarnings("deprecation") + final URL[] urls = new URL[] { FileSystem.getLocal(conf) + .pathToFile(new Path(jarFile)).toURL() }; + // FindBugs complains that creating a URLClassLoader should be + // in a doPrivileged() block. + ClassLoader loader = AccessController + .doPrivileged(new PrivilegedAction() { + @Override + public ClassLoader run() { + return new URLClassLoader(urls); + } + }); + conf.setClassLoader(loader); + } + + runJob(job); + return 0; + } catch (ParseException pe) { + LOG.info("Error : " + pe); + cli.printUsage(); + return 1; + } + + } + + /** + * Submit a pipes job based on the command line arguments. + * + * @param args + */ + public static void main(String[] args) throws Exception { + int exitCode = new Submitter().run(args); + System.exit(exitCode); + } + +} Index: core/src/main/java/org/apache/hama/pipes/PipesBSP.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/PipesBSP.java (Revision 0) +++ core/src/main/java/org/apache/hama/pipes/PipesBSP.java (Arbeitskopie) @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.pipes; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hama.bsp.BSP; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.bsp.sync.SyncException; + +/** + * A BSP that can communicate via pipes with other programming languages and + * runtimes. + */ +public class PipesBSP + extends BSP { + + private static final Log LOG = LogFactory.getLog(PipesBSP.class); + private Application application; + + @Override + public void setup(BSPPeer peer) + throws IOException, SyncException, InterruptedException { + + this.application = new Application(peer); + application.getDownlink().runSetup(false, false); + } + + @Override + public void bsp(BSPPeer peer) + throws IOException, SyncException, InterruptedException { + + application.getDownlink().runBsp(false, false); + } + + @Override + public void cleanup(BSPPeer peer) + throws IOException { + + application.getDownlink().runCleanup(false, false); + + try { + application.waitForFinish(); + } catch (IOException e) { + LOG.error(e); + throw e; + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + application.cleanup(); + } + } + +} Index: core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java (Revision 0) +++ core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java (Arbeitskopie) @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.pipes; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hama.bsp.BSPJob; +import org.apache.hama.bsp.InputFormat; +import org.apache.hama.bsp.InputSplit; +import org.apache.hama.bsp.RecordReader; +import org.apache.hama.bsp.TextInputFormat; + +/** + * Dummy input format used when non-Java a {@link RecordReader} is used by the + * Pipes' application. + * + * The only useful thing this does is set up the Map-Reduce job to get the + * {@link PipesDummyRecordReader}, everything else left for the 'actual' + * InputFormat specified by the user which is given by + * mapred.pipes.user.inputformat. + * + * Adapted from Hadoop Pipes. + * + */ +public class PipesNonJavaInputFormat implements + InputFormat { + + @Override + public RecordReader getRecordReader( + InputSplit genericSplit, BSPJob job) throws IOException { + return new PipesDummyRecordReader(job.getConf(), genericSplit); + } + + @Override + public InputSplit[] getSplits(BSPJob job, int numSplits) throws IOException { + // Delegate the generation of input splits to the 'original' InputFormat + return ReflectionUtils.newInstance( + job.getConf().getClass("hama.pipes.user.inputformat", + TextInputFormat.class, InputFormat.class), job.getConf()) + .getSplits(job, numSplits); + } + + /** + * A dummy {@link org.apache.hadoop.mapred.RecordReader} to help track the + * progress of Hama Pipes' applications when they are using a non-Java + * RecordReader. + * + * The PipesDummyRecordReader is informed of the 'progress' of + * the task by the {@link OutputHandler#progress(float)} which calls the + * {@link #next(FloatWritable, NullWritable)} with the progress as the + * key. + */ + static class PipesDummyRecordReader implements + RecordReader { + float progress = 0.0f; + + public PipesDummyRecordReader(Configuration job, InputSplit split) + throws IOException { + } + + @Override + public FloatWritable createKey() { + return null; + } + + @Override + public NullWritable createValue() { + return null; + } + + @Override + public synchronized void close() throws IOException { + } + + @Override + public synchronized long getPos() throws IOException { + return 0; + } + + @Override + public float getProgress() { + return progress; + } + + @Override + public synchronized boolean next(FloatWritable key, NullWritable value) + throws IOException { + progress = key.get(); + return true; + } + } +} Index: core/src/main/java/org/apache/hama/pipes/SimpleStreamingRunner.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/SimpleStreamingRunner.java (Revision 0) +++ core/src/main/java/org/apache/hama/pipes/SimpleStreamingRunner.java (Arbeitskopie) @@ -0,0 +1,64 @@ +package org.apache.hama.pipes; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.BSPJob; +import org.apache.hama.bsp.NullInputFormat; +import org.apache.hama.bsp.TextOutputFormat; + +public class SimpleStreamingRunner { + + public static void main(String[] args) throws Exception { + Configuration conf = new Configuration(); + conf.set("hama.streaming.enabled", "true"); + conf.set("bsp.local.tasks.maximum", "1"); + conf.set("hama.pipes.executable.interpretor", "python"); + conf.set("hama.pipes.command-file.keep", "true"); + conf.set("hama.pipes.executable", + "/Users/thomas.jungblut/PycharmProjects/HamaStreaming/BSPRunner.py"); + conf.set("hama.pipes.executable.args", "HelloWorldBSP"); + FileSystem fs = FileSystem.get(conf); + DistributedCache.setCacheFiles( + getCachedFiles(fs, + "/Users/thomas.jungblut/PycharmProjects/HamaStreaming/*.py", + conf.get("hama.pipes.executable")), conf); + + BSPJob job = new BSPJob(new HamaConfiguration(conf)); + job.setBspClass(PipesBSP.class); + job.setInputFormat(NullInputFormat.class); + job.setOutputFormat(TextOutputFormat.class); + job.setOutputPath(new Path("/tmp/pipestmp")); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + job.waitForCompletion(true); + } + + static URI[] getCachedFiles(FileSystem fs, String root, String filter) + throws IOException { + FileStatus[] listStatus = fs.globStatus(new Path(root)); + ArrayList list = new ArrayList(); + try { + list.add(new URI(filter)); + } catch (URISyntaxException e) { + e.printStackTrace(); + } + + for (FileStatus f : listStatus) { + if (!f.isDir() && !f.getPath().getName().equals(filter)) + list.add(f.getPath().toUri()); + } + + return list.toArray(new URI[list.size()]); + } + +} Index: core/src/main/java/org/apache/hama/bsp/TaskLog.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/TaskLog.java (Revision 1383877) +++ core/src/main/java/org/apache/hama/bsp/TaskLog.java (Arbeitskopie) @@ -27,6 +27,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hama.HamaConfiguration; @@ -261,25 +262,41 @@ mergedCmd.append(tailCommand); mergedCmd.append(" -c "); mergedCmd.append(tailLength); - mergedCmd.append(" >> "); + mergedCmd.append(" > "); mergedCmd.append(stdout); mergedCmd.append(" ; exit $PIPESTATUS ) 2>&1 | "); mergedCmd.append(tailCommand); mergedCmd.append(" -c "); mergedCmd.append(tailLength); - mergedCmd.append(" >> "); + mergedCmd.append(" > "); mergedCmd.append(stderr); mergedCmd.append(" ; exit $PIPESTATUS"); } else { - mergedCmd.append(" 1>> "); + mergedCmd.append(" 1> "); mergedCmd.append(stdout); - mergedCmd.append(" 2>> "); + mergedCmd.append(" 2> "); mergedCmd.append(stderr); } result.add(mergedCmd.toString()); return result; } + public static List captureOutAndErrorTee(List setup, + List cmd, File stdoutFilename, File stderrFilename, + long tailLength) throws IOException { + String stdout = FileUtil.makeShellPath(stdoutFilename); + List result = new ArrayList(3); + result.add(bashCommand); + result.add("-c"); + StringBuilder mergedCmd = new StringBuilder(); + + mergedCmd.append(addCommand(cmd, true)); + mergedCmd.append(" 2>&1 | tee " + stdout); + + result.add(mergedCmd.toString()); + return result; + } + /** * Add quotes to each of the command strings and return as a single string * @@ -346,4 +363,14 @@ return result; } + /** + * Get the desired maximum length of task's logs. + * + * @param conf the job to look in + * @return the number of bytes to cap the log files at + */ + public static long getTaskLogLength(Configuration conf) { + return conf.getLong("bsp.userlog.limit.kb", 100) * 1024; + } + } // TaskLog Index: core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (Revision 1383877) +++ core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (Arbeitskopie) @@ -58,10 +58,7 @@ private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class); public static enum PeerCounter { - SUPERSTEP_SUM, SUPERSTEPS, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS, - IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED, - TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT, - COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS + SUPERSTEP_SUM, SUPERSTEPS, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS, IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS } private final Configuration conf; @@ -96,7 +93,7 @@ private FaultTolerantPeerService faultToleranceService; private long splitSize = 0L; - + /** * Protected default constructor for LocalBSPRunner. */ @@ -232,6 +229,9 @@ } } } + + initialize(); + doFirstSync(superstep); if (LOG.isDebugEnabled()) { @@ -348,7 +348,7 @@ public long getSplitSize() { return splitSize; } - + /** * @return the position in the input stream. */ @@ -356,7 +356,7 @@ public long getPos() throws IOException { return in.getPos(); } - + public final void initilizeMessaging() throws ClassNotFoundException { messenger = MessageManagerFactory.getMessageManager(conf); messenger.init(taskId, this, conf, peerAddress); @@ -463,7 +463,7 @@ messenger.clearOutgoingQueues(); leaveBarrier(); - + incrementCounter(PeerCounter.TIME_IN_SYNC_MS, (System.currentTimeMillis() - startBarrier)); incrementCounter(PeerCounter.SUPERSTEP_SUM, 1L); @@ -479,7 +479,7 @@ } umbilical.statusUpdate(taskId, currentTaskStatus); - + } private final BSPMessageBundle combineMessages(Iterable messages) { @@ -702,4 +702,9 @@ } } + @Override + public TaskAttemptID getTaskId() { + return taskId; + } + } Index: core/src/main/java/org/apache/hama/bsp/BSPPeer.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPPeer.java (Revision 1383877) +++ core/src/main/java/org/apache/hama/bsp/BSPPeer.java (Arbeitskopie) @@ -27,10 +27,9 @@ import org.apache.hama.util.KeyValuePair; /** - * BSP communication interface. - * Reads key-value inputs, with K1 typed keys and V1 typed values. - * Collects key-value outputs, with k2 typed keys and V2 typed values. - * Exchange messages with other {@link BSPPeer}s via messages of type M. + * BSP communication interface. Reads key-value inputs, with K1 typed keys and + * V1 typed values. Collects key-value outputs, with k2 typed keys and V2 typed + * values. Exchange messages with other {@link BSPPeer}s via messages of type M. */ public interface BSPPeer extends Constants { @@ -186,10 +185,15 @@ * @return the size of assigned split */ public long getSplitSize(); - + /** * @return the current position of the file read pointer * @throws IOException */ public long getPos() throws IOException; + + /** + * @return the task id of this task. + */ + public TaskAttemptID getTaskId(); }