Index: src/java/org/apache/hama/graph/JobStatus.java =================================================================== --- src/java/org/apache/hama/graph/JobStatus.java (revision 884063) +++ src/java/org/apache/hama/graph/JobStatus.java (working copy) @@ -63,11 +63,11 @@ this(jobid, 0.0f, progress, cleanupProgress, runState); } - public JobStatus(JobID jobid, float setupProgress, float mapProgress, + public JobStatus(JobID jobid, float setupProgress, float progress, float cleanupProgress, int runState) { this.jobid = jobid; this.setupProgress = setupProgress; - this.progress = mapProgress; + this.progress = progress; this.cleanupProgress = cleanupProgress; this.runState = runState; } Index: src/java/org/apache/hama/graph/RecordWriter.java =================================================================== --- src/java/org/apache/hama/graph/RecordWriter.java (revision 0) +++ src/java/org/apache/hama/graph/RecordWriter.java (revision 0) @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.graph; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; + +/** + * RecordWriter writes the output <key, value> pairs + * to an output file. + + *

RecordWriter implementations write the job outputs to the + * {@link FileSystem}. + * + * @see OutputFormat + */ +public abstract class RecordWriter { + /** + * Writes a key/value pair. + * + * @param key the key to write. + * @param value the value to write. + * @throws IOException + */ + public abstract void write(K key, V value + ) throws IOException, InterruptedException; + + /** + * Close this RecordWriter to future operations. + * + * @param context the context of the task + * @throws IOException + */ + public abstract void close(TaskAttemptContext context + ) throws IOException, InterruptedException; +} Index: src/java/org/apache/hama/graph/InputSplit.java =================================================================== --- src/java/org/apache/hama/graph/InputSplit.java (revision 0) +++ src/java/org/apache/hama/graph/InputSplit.java (revision 0) @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.graph; + +import java.io.IOException; + +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.RecordReader; + +/** + * InputSplit represents the data to be processed by an + * individual {@link Walker}. + * + *

Typically, it presents a byte-oriented view on the input and is the + * responsibility of {@link RecordReader} of the job to process this and present + * a record-oriented view. + * + * @see InputFormat + * @see RecordReader + */ +public abstract class InputSplit { + /** + * Get the size of the split, so that the input splits can be sorted by size. + * @return the number of bytes in the split + * @throws IOException + * @throws InterruptedException + */ + public abstract long getLength() throws IOException, InterruptedException; + + /** + * Get the list of nodes by name where the data for the split would be local. + * The locations do not need to be serialized. + * @return a new array of the node nodes. + * @throws IOException + * @throws InterruptedException + */ + public abstract + String[] getLocations() throws IOException, InterruptedException; +} \ No newline at end of file Index: src/java/org/apache/hama/graph/OutputCommitter.java =================================================================== --- src/java/org/apache/hama/graph/OutputCommitter.java (revision 0) +++ src/java/org/apache/hama/graph/OutputCommitter.java (revision 0) @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.graph; + +import java.io.IOException; + + + /** + * + * @see JobContext + * @see TaskAttemptContext + * + */ +public abstract class OutputCommitter { + /** + * For the framework to setup the job output during initialization + * + * @param jobContext Context of the job whose output is being written. + * @throws IOException if temporary output could not be created + */ + public abstract void setupJob(JobContext jobContext) throws IOException; + + /** + * For cleaning up the job's output after job completion + * + * @param jobContext Context of the job whose output is being written. + * @throws IOException + */ + public abstract void cleanupJob(JobContext jobContext) throws IOException; + + /** + * Sets up output for the task. + * + * @param taskContext Context of the task whose output is being written. + * @throws IOException + */ + public abstract void setupTask(TaskAttemptContext taskContext) + throws IOException; + + /** + * Check whether task needs a commit + * + * @param taskContext + * @return true/false + * @throws IOException + */ + public abstract boolean needsTaskCommit(TaskAttemptContext taskContext) + throws IOException; + + /** + * To promote the task's temporary output to final output location + * + * The task's output is moved to the job's output directory. + * + * @param taskContext Context of the task whose output is being written. + * @throws IOException if commit is not + */ + public abstract void commitTask(TaskAttemptContext taskContext) + throws IOException; + + /** + * Discard the task output + * + * @param taskContext + * @throws IOException + */ + public abstract void abortTask(TaskAttemptContext taskContext) + throws IOException; +} Index: src/java/org/apache/hama/graph/JobContext.java =================================================================== --- src/java/org/apache/hama/graph/JobContext.java (revision 0) +++ src/java/org/apache/hama/graph/JobContext.java (revision 0) @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.graph; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Text; + +/** + * A read-only view of the job that is provided to the tasks while they + * are running. + */ +public class JobContext { + // Put all of the attribute names in here so that Job and JobContext are + // consistent. + protected static final String INPUT_FORMAT_CLASS_ATTR = + "angrapa.inputformat.class"; + protected static final String WALKER_CLASS_ATTR = "angrapa.walker.class"; + protected static final String OUTPUT_FORMAT_CLASS_ATTR = + "angrapa.outputformat.class"; + + protected final Configuration conf; + private final JobID jobId; + + public JobContext(Configuration conf, JobID jobId) { + this.conf = conf; + this.jobId = jobId; + } + + public Configuration getConfiguration() { + return conf; + } + + public JobID getJobID() { + return jobId; + } + + public Path getWorkingDirectory() throws IOException { + String name = conf.get("angrapa.working.dir"); + + if (name != null) { + return new Path(name); + } else { + try { + Path dir = FileSystem.get(conf).getWorkingDirectory(); + conf.set("angrapa.working.dir", dir.toString()); + return dir; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + public Class getOutputKeyClass() { + return conf.getClass("angrapa.output.key.class", + LongWritable.class, Object.class); + } + + public Class getOutputValueClass() { + return conf.getClass("angrapa.output.value.class", Text.class, Object.class); + } + + public String getJobName() { + return conf.get("angrapa.job.name", ""); + } + + @SuppressWarnings("unchecked") + public Class> getInputFormatClass() + throws ClassNotFoundException { + return (Class>) + conf.getClass(INPUT_FORMAT_CLASS_ATTR, InputFormat.class); // TODO: To be corrected to an implemented class + } + + @SuppressWarnings("unchecked") + public Class> getOutputFormatClass() + throws ClassNotFoundException { + return (Class>) + conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, OutputFormat.class); // TODO: To be corrected to an implemented class + } + + public RawComparator getSortComparator() { + return null; + } + + public String getJar() { + return conf.get("walker.jar"); + } +} Index: src/java/org/apache/hama/graph/OutputFormat.java =================================================================== --- src/java/org/apache/hama/graph/OutputFormat.java (revision 0) +++ src/java/org/apache/hama/graph/OutputFormat.java (revision 0) @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.graph; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; + +/** + * OutputFormat describes the output-specification for a + * Angrapa job. + * + *

The Angrapa framework relies on the OutputFormat of the + * job to:

+ *

    + *
  1. + * Validate the output-specification of the job. For e.g. check that the + * output directory doesn't already exist. + *
  2. + * Provide the {@link RecordWriter} implementation to be used to write out + * the output files of the job. Output files are stored in a + * {@link FileSystem}. + *
  3. + *
+ * + * @see RecordWriter + */ +public abstract class OutputFormat { + + /** + * Get the {@link RecordWriter} for the given task. + * + * @param context the information about the current task. + * @return a {@link RecordWriter} to write the output for the job. + * @throws IOException + */ + public abstract RecordWriter + getRecordWriter(TaskAttemptContext context + ) throws IOException, InterruptedException; + + /** + * Check for validity of the output-specification for the job. + * + *

This is to validate the output specification for the job when it is + * a job is submitted. Typically checks that it does not already exist, + * throwing an exception when it already exists, so that output is not + * overwritten.

+ * + * @param context information about the job + * @throws IOException when output should not be attempted + */ + public abstract void checkOutputSpecs(JobContext context + ) throws IOException, + InterruptedException; + + /** + * Get the output committer for this output format. This is responsible + * for ensuring the output is committed correctly. + * @param context the task context + * @return an output committer + * @throws IOException + * @throws InterruptedException + */ + public abstract + OutputCommitter getOutputCommitter(TaskAttemptContext context + ) throws IOException, InterruptedException; +} + Index: src/java/org/apache/hama/graph/RecordReader.java =================================================================== --- src/java/org/apache/hama/graph/RecordReader.java (revision 0) +++ src/java/org/apache/hama/graph/RecordReader.java (revision 0) @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.graph; + +import java.io.Closeable; +import java.io.IOException; + +/** + * The record reader breaks the data into a vertex for input to the + * {@link Walker}. + * @param + * @param + */ +public abstract class RecordReader implements Closeable { + + /** + * Called once at initialization. + * @param split the split that defines the range of records to read + * @throws IOException + * @throws InterruptedException + */ + public abstract void initialize(InputSplit split) throws IOException, InterruptedException; + + /** + * Read the next record + * @return true if a vertex was read + * @throws IOException + * @throws InterruptedException + */ + public abstract + boolean nextKeyValue() throws IOException, InterruptedException; + + /** + * Get the current key + * @return the current key or null if there is no current key + * @throws IOException + * @throws InterruptedException + */ + public abstract + KEYIN getCurrentKey() throws IOException, InterruptedException; + + /** + * Get the current value + * @return the object that was read + * @throws IOException + * @throws InterruptedException + */ + public abstract + VALUEIN getCurrentValue() throws IOException, InterruptedException; + + /** + * Get a value by the given key. It provides random access. + * @param key + * @return the value by the given key + * @throws IOException + * @throws InterruptedException + */ + public abstract + VALUEIN getValueByKey(KEYIN key) throws IOException, InterruptedException; + + /** + * The current progress of the record reader through its data. + * @return a number between 0.0 and 1.0 that is the fraction of the data read + * @throws IOException + * @throws InterruptedException + */ + public abstract float getProgress() throws IOException, InterruptedException; + + /** + * Close the record reader. + */ + public abstract void close() throws IOException; +} \ No newline at end of file Index: src/java/org/apache/hama/graph/InputFormat.java =================================================================== --- src/java/org/apache/hama/graph/InputFormat.java (revision 0) +++ src/java/org/apache/hama/graph/InputFormat.java (revision 0) @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.graph; + +import java.io.IOException; + +import java.util.List; + +import org.apache.hadoop.mapreduce.InputSplit; + +public abstract class InputFormat { + + /** + * Logically split the set of input files for the job. + * + *

Each {@link InputSplit} is then assigned to an individual {@link Walker} + * for processing.

+ * + *

Note: The split is a logical split of the inputs and the + * input files are not physically split into chunks. For e.g. a split could + * be <input-file-path, start, offset> tuple. The InputFormat + * also creates the {@link RecordReader} to read the {@link InputSplit}. + * + * @param context job configuration. + * @return an array of {@link InputSplit}s for the job. + */ + public abstract + List getSplits(JobContext context + ) throws IOException, InterruptedException; + + /** + * Create a record reader for a given split. The framework will call + * {@link RecordReader#initialize(InputSplit)} before + * the split is used. + * @param split the split to be read + * @return a new record reader + * @throws IOException + * @throws InterruptedException + */ + public abstract + RecordReader createRecordReader(InputSplit split + ) throws IOException,InterruptedException; + +} + Index: src/java/org/apache/hama/graph/TaskAttemptContext.java =================================================================== --- src/java/org/apache/hama/graph/TaskAttemptContext.java (revision 0) +++ src/java/org/apache/hama/graph/TaskAttemptContext.java (revision 0) @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.graph; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.Progressable; + +/** + * The context for task attempts. + */ +public class TaskAttemptContext extends JobContext implements Progressable { + private final TaskAttemptID taskId; + private String status = ""; + + public TaskAttemptContext(Configuration conf, + TaskAttemptID taskId) { + super(conf, taskId.getJobID()); + this.taskId = taskId; + } + + /** + * Get the unique name for this task attempt. + */ + public TaskAttemptID getTaskAttemptID() { + return taskId; + } + + /** + * Set the current status of the task to the given string. + */ + public void setStatus(String msg) throws IOException { + status = msg; + } + + /** + * Get the last set status message. + * @return the current status message + */ + public String getStatus() { + return status; + } + + /** + * Report progress. The subtypes actually do work in this method. + */ + public void progress() { + } +} \ No newline at end of file