Index: src/java/org/apache/hadoop/mapred/lib/database/DatabaseInputSplit.java
===================================================================
--- src/java/org/apache/hadoop/mapred/lib/database/DatabaseInputSplit.java	(revision 0)
+++ src/java/org/apache/hadoop/mapred/lib/database/DatabaseInputSplit.java	(revision 0)
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.database;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.InputSplit;
+
+/**
+ * A InputSplit that spans a set of rows
+ */
+public class DatabaseInputSplit implements InputSplit {
+  private int end = 0;
+
+  private int start = 0;
+
+  /**
+   * Default Constructor
+   */
+  public DatabaseInputSplit() {
+
+  }
+
+  /**
+   * Convenient Constructor
+   * 
+   * @param start
+   *                The index of the first row to select
+   * @param end
+   *                The index of the last row to select
+   */
+  public DatabaseInputSplit(int start, int end) {
+    this.start = start;
+    this.end = end;
+  }
+
+  /**
+   * @return The index of the last row to select
+   */
+  public int getEnd() {
+    return end;
+  }
+
+  /**
+   * @return The total row count in this split
+   */
+  public long getLength() throws IOException {
+    return end - start;
+  }
+
+  /** {@inheritDoc} */
+  public String[] getLocations() throws IOException {
+    // TODO Add a layer to enable SQL "sharding" and support locality
+    return new String[] {};
+  }
+
+  /**
+   * @return The index of the first row to select
+   */
+  public int getStart() {
+    return start;
+  }
+
+  /** {@inheritDoc} */
+  public void readFields(DataInput input) throws IOException {
+    start = input.readInt();
+    end = input.readInt();
+  }
+
+  /** {@inheritDoc} */
+  public void write(DataOutput output) throws IOException {
+    output.writeInt(start);
+    output.writeInt(end);
+  }
+}
Index: src/java/org/apache/hadoop/mapred/lib/database/DatabaseField.java
===================================================================
--- src/java/org/apache/hadoop/mapred/lib/database/DatabaseField.java	(revision 0)
+++ src/java/org/apache/hadoop/mapred/lib/database/DatabaseField.java	(revision 0)
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.database;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * Represents a mapping between a MapReduce value and JDBC field, mainly
+ * introduced in order to correctly read/write different types from/to the
+ * database.
+ * 
+ * TODO Should we drop the concept of mapping fields and just infer the type
+ * based on the writable (Text,IntWritable etc) and the field
+ * (ResultSetMetadata)?
+ */
+public class DatabaseField {
+  /**
+   * Deserialize a string of properties into an array
+   * 
+   * @param string
+   *                The serialized string
+   * @return The deserialized array
+   */
+  public static DatabaseField[] deserialize(String string) {
+    ArrayList<DatabaseField> fields = new ArrayList<DatabaseField>();
+
+    for (String field : string.split(";"))
+      fields.add(parse(field));
+
+    return fields.toArray(new DatabaseField[fields.size()]);
+  }
+
+  /**
+   * Parse a single serialized field into it's object representation
+   * 
+   * @param field
+   *                The serialized field
+   * @return The field
+   */
+  public static DatabaseField parse(String field) {
+    int i = field.indexOf(":");
+
+    if (i == -1)
+      throw new IllegalArgumentException("Field type not found");
+
+    String type = field.substring(0, i);
+    String name = field.substring(i + 1, field.length());
+
+    if (type.equals(DatabaseFieldType.TEXT.toString().toLowerCase())) {
+      return new DatabaseField(name, DatabaseFieldType.TEXT);
+    } else if (type.equals(DatabaseFieldType.INTEGER.toString().toLowerCase())) {
+      return new DatabaseField(name, DatabaseFieldType.INTEGER);
+    } else if (type.equals(DatabaseFieldType.LONG.toString().toLowerCase())) {
+      return new DatabaseField(name, DatabaseFieldType.LONG);
+    } else if (type.equals(DatabaseFieldType.DOUBLE.toString().toLowerCase())) {
+      return new DatabaseField(name, DatabaseFieldType.DOUBLE);
+    } else if (type.equals(DatabaseFieldType.BINARY.toString().toLowerCase())) {
+      return new DatabaseField(name, DatabaseFieldType.BINARY);
+    } else {
+      throw new IllegalArgumentException("Unknown field type");
+    }
+  }
+
+  /**
+   * Serialize an array of fields into a string
+   * 
+   * @param fields
+   *                The fields to serialize
+   * @return The serialized fields
+   */
+  public static String serialize(DatabaseField[] fields) {
+    StringBuilder buffer = new StringBuilder();
+
+    for (DatabaseField field : fields) {
+      if (buffer.length() > 0)
+	buffer.append(";");
+
+      buffer.append(field.toString());
+    }
+
+    return buffer.toString();
+  }
+
+  private Text key;
+
+  private String name;
+
+  private DatabaseFieldType type;
+
+  /**
+   * Constructs a field with the default type of text
+   * 
+   * @param name
+   *                The name of the field
+   */
+  public DatabaseField(String name) {
+    this(name, DatabaseFieldType.TEXT);
+  }
+
+  /**
+   * Convenient constructor
+   * 
+   * @param name
+   *                The name of the field
+   * @param type
+   *                The type of the field
+   */
+  public DatabaseField(String name, DatabaseFieldType type) {
+    this.name = name;
+    this.type = type;
+  }
+
+  /**
+   * Get the key representation of the field name
+   * 
+   * @return The key
+   */
+  public Text getKey() {
+    if (key == null)
+      key = new Text(name);
+
+    return key;
+  }
+
+  /**
+   * @return The name of the field
+   */
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * @return The type of the field
+   */
+  public DatabaseFieldType getType() {
+    return type;
+  }
+
+  public String toString() {
+    return type.name().toLowerCase() + ":" + name;
+  }
+}
\ No newline at end of file
Index: src/java/org/apache/hadoop/mapred/lib/database/DatabaseFieldType.java
===================================================================
--- src/java/org/apache/hadoop/mapred/lib/database/DatabaseFieldType.java	(revision 0)
+++ src/java/org/apache/hadoop/mapred/lib/database/DatabaseFieldType.java	(revision 0)
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.database;
+
+/**
+ * Determines the type of a DatabaseField and how it's read and written to the
+ * database
+ */
+public enum DatabaseFieldType {
+  TEXT, INTEGER, LONG, DOUBLE, BINARY
+}
Index: src/java/org/apache/hadoop/mapred/lib/database/DatabaseConfiguration.java
===================================================================
--- src/java/org/apache/hadoop/mapred/lib/database/DatabaseConfiguration.java	(revision 0)
+++ src/java/org/apache/hadoop/mapred/lib/database/DatabaseConfiguration.java	(revision 0)
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.database;
+
+/**
+ * A container for configuration property names
+ */
+public class DatabaseConfiguration {
+
+  public static final String DRIVER_CLASS_PROPERTY = "jdbc.mapred.driver_class";
+  
+  public static final String INPUT_KEY_FIELD_PROPERTY = "jdbc.mapred.input_key";
+
+  public static final String INPUT_CONDITIONS_PROPERTY = "jdbc.mapred.input_conditions";
+
+  public static final String INPUT_FIELDS_PROPERTY = "jdbc.mapred.input_fields";
+
+  public static final String INPUT_TABLE_PROPERTY = "jdbc.mapred.input_table";
+
+  public static final String OUTPUT_KEY_FIELD_PROPERTY = "jdbc.mapred.output_key";
+
+  public static final String OUTPUT_FIELDS_PROPERTY = "jdbc.mapred.output_fields";
+
+  public static final String OUTPUT_TABLE_PROPERTY = "jdbc.mapred.output_table";
+
+  public static final String PASSWORD_PROPERTY = "jdbc.mapred.password";
+  
+  public static final String URL_PROPERTY = "jdbc.mapred.url";
+
+  public static final String USERNAME_PROPERTY = "jdbc.mapred.username";
+}
Index: src/java/org/apache/hadoop/mapred/lib/database/DatabaseInputFormat.java
===================================================================
--- src/java/org/apache/hadoop/mapred/lib/database/DatabaseInputFormat.java	(revision 0)
+++ src/java/org/apache/hadoop/mapred/lib/database/DatabaseInputFormat.java	(revision 0)
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.database;
+
+import java.io.IOException;
+import java.sql.Blob;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * A InputFormat that reads input data from a SQL table
+ */
+public class DatabaseInputFormat implements InputFormat<Text, MapWritable>,
+    JobConfigurable {
+  /**
+   * A RecordReader that reads records from a SQL table
+   */
+  protected class DatabaseRecordReader implements
+      RecordReader<Text, MapWritable> {
+    private ResultSet results;
+
+    private PreparedStatement statement;
+
+    /**
+     * @param split
+     *                The InputSplit to read data for
+     */
+    protected DatabaseRecordReader(DatabaseInputSplit split) {
+      try {
+	StringBuilder query = new StringBuilder();
+	query.append("SELECT " + keyField.getName());
+
+	for (int i = 0; i < fields.length; i++)
+	  query.append(", " + fields[i].getName());
+
+	query.append(" FROM " + table + " ");
+	if (conditions != null && conditions.length() > 0)
+	  query.append("WHERE (" + conditions + ") ");
+
+	query.append("LIMIT " + split.getLength() + " OFFSET "
+	    + split.getStart() + ";");
+
+	statement = connection.prepareStatement(query.toString(),
+	    ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+	statement.setFetchSize(Integer.MIN_VALUE);
+
+	results = statement.executeQuery();
+      } catch (Exception e) {
+	e.printStackTrace();
+      }
+    }
+
+    /** {@inheritDoc} */
+    public void close() throws IOException {
+      try {
+	results.close();
+	statement.close();
+      } catch (SQLException e) {
+	throw new IOException(e.getMessage());
+      }
+    }
+
+    /** {@inheritDoc} */
+    public Text createKey() {
+      return new Text();
+    }
+
+    /** {@inheritDoc} */
+    public MapWritable createValue() {
+      return new MapWritable();
+    }
+
+    /** {@inheritDoc} */
+    public long getPos() throws IOException {
+      // TODO: The following is not possible when using JDBC streaming
+
+      // try {
+      // return results.getRow();
+      // } catch (SQLException e) {
+      // throw new IOException(e.getMessage());
+      // }
+
+      return 0;
+    }
+
+    /** {@inheritDoc} */
+    public float getProgress() throws IOException {
+      // TODO: The following is not possible when using JDBC streaming
+
+      // try {
+      // return results.getRow() / split.getLength();
+      // } catch (SQLException e) {
+      // throw new IOException(e.getMessage());
+      // }
+
+      return 0;
+    }
+
+    /** {@inheritDoc} */
+    public boolean next(Text key, MapWritable value) throws IOException {
+      try {
+	if (!results.next())
+	  return false;
+
+	// Set the key field value as the output key value
+	key.set(results.getString(1));
+
+	value.clear();
+
+	// TODO Should we skip the explicit specification and infer the type
+	// depending on the type of field?
+	for (int i = 0; i < fields.length; i++) {
+	  if (fields[i].getType().equals(DatabaseFieldType.INTEGER)) {
+	    value.put(fields[i].getKey(), new IntWritable(results
+		.getInt(fields[i].getName())));
+	  } else if (fields[i].getType().equals(DatabaseFieldType.LONG)) {
+	    value.put(fields[i].getKey(), new LongWritable(results
+		.getLong(fields[i].getName())));
+	  } else if (fields[i].getType().equals(DatabaseFieldType.DOUBLE)) {
+	    value.put(fields[i].getKey(), new DoubleWritable(results
+		.getDouble(fields[i].getName())));
+	  } else if (fields[i].getType().equals(DatabaseFieldType.BINARY)) {
+	    Blob blob = results.getBlob(fields[i].getName());
+	    byte[] bytes = blob.getBytes(1, (int) blob.length());
+	    value.put(fields[i].getKey(), new BytesWritable(bytes));
+	  } else {
+	    value.put(fields[i].getKey(), new Text(results.getString(fields[i]
+		.getName())));
+	  }
+	}
+      } catch (Exception e) {
+	e.printStackTrace();
+      }
+
+      return true;
+    }
+  }
+
+  private String conditions;
+
+  private Connection connection;
+
+  private DatabaseField[] fields;
+
+  private DatabaseField keyField;
+
+  private String table;
+
+  /** {@inheritDoc} */
+  public void configure(JobConf job) {
+    try {
+      Class.forName(job.get(DatabaseConfiguration.DRIVER_CLASS_PROPERTY));
+    } catch (ClassNotFoundException e) {
+      e.printStackTrace();
+      return;
+    }
+
+    table = job.get(DatabaseConfiguration.INPUT_TABLE_PROPERTY);
+
+    keyField = DatabaseField.parse(job
+	.get(DatabaseConfiguration.INPUT_KEY_FIELD_PROPERTY));
+    fields = DatabaseField.deserialize(job
+	.get(DatabaseConfiguration.INPUT_FIELDS_PROPERTY));
+    conditions = job.get(DatabaseConfiguration.INPUT_CONDITIONS_PROPERTY);
+
+    try {
+      connection = DriverManager.getConnection(job
+	  .get(DatabaseConfiguration.URL_PROPERTY), job
+	  .get(DatabaseConfiguration.USERNAME_PROPERTY), job
+	  .get(DatabaseConfiguration.PASSWORD_PROPERTY));
+    } catch (SQLException e) {
+      e.printStackTrace();
+    }
+  }
+
+  /** {@inheritDoc} */
+  public RecordReader<Text, MapWritable> getRecordReader(InputSplit split,
+      JobConf job, Reporter reporter) throws IOException {
+    return new DatabaseRecordReader((DatabaseInputSplit) split);
+  }
+
+  /** {@inheritDoc} */
+  public InputSplit[] getSplits(JobConf job, int chunks) throws IOException {
+    try {
+      Class.forName(job.get(DatabaseConfiguration.DRIVER_CLASS_PROPERTY));
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e.getMessage());
+    }
+
+    try {
+      StringBuilder query = new StringBuilder();
+      query.append("SELECT COUNT(" + keyField.getName() + ") FROM " + table);
+
+      if (conditions != null && conditions.length() > 0)
+	query.append(" WHERE " + conditions);
+
+      query.append(";");
+
+      Statement statement = connection.createStatement();
+
+      ResultSet results = statement.executeQuery(query.toString());
+      results.next();
+
+      int count = results.getInt(1);
+      int chunkSize = (int) (count / chunks);
+
+      results.close();
+      statement.close();
+
+      InputSplit[] splits = new InputSplit[chunks];
+
+      // Split the rows into n-number of chunks and adjust the last chunk
+      // accordingly
+      for (int i = 0; i < chunks; i++) {
+	DatabaseInputSplit split;
+
+	if ((i + 1) == chunks)
+	  split = new DatabaseInputSplit(i * chunkSize, count);
+	else
+	  split = new DatabaseInputSplit(i * chunkSize, (i * chunkSize)
+	      + chunkSize);
+
+	splits[i] = split;
+      }
+
+      return splits;
+    } catch (SQLException e) {
+      throw new IOException(e.getMessage());
+    }
+  }
+
+  /** {@inheritDoc} */
+  public void validateInput(JobConf job) throws IOException {
+    // TODO Implement
+  }
+
+  /**
+   * Initializes the map-part of the job with the appropriate input settings
+   * 
+   * @param job
+   *                The job
+   * @param table
+   *                The table to read data from
+   * @param keyField
+   *                The field to use as row key (should be a primary key)
+   * @param fields
+   *                The fields to input
+   * @param conditions
+   *                The condition which to select data with, eg. '(updated >
+   *                20070101 AND length > 0)'
+   */
+  public static void setInput(JobConf job, String table,
+      DatabaseField keyField, DatabaseField[] fields, String conditions) {
+    job.setInputFormat(DatabaseInputFormat.class);
+
+    job.set(DatabaseConfiguration.INPUT_TABLE_PROPERTY, table);
+
+    job
+	.set(DatabaseConfiguration.INPUT_KEY_FIELD_PROPERTY, keyField
+	    .toString());
+    job.set(DatabaseConfiguration.INPUT_FIELDS_PROPERTY, DatabaseField
+	.serialize(fields));
+
+    if (conditions != null && conditions.length() > 0)
+      job.set(DatabaseConfiguration.INPUT_CONDITIONS_PROPERTY, conditions);
+  }
+}
Index: src/java/org/apache/hadoop/mapred/lib/database/DatabaseOutputFormat.java
===================================================================
--- src/java/org/apache/hadoop/mapred/lib/database/DatabaseOutputFormat.java	(revision 0)
+++ src/java/org/apache/hadoop/mapred/lib/database/DatabaseOutputFormat.java	(revision 0)
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.database;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * A OutputFormat that sends the reduce output to a SQL table
+ */
+public class DatabaseOutputFormat implements OutputFormat<Text, MapWritable> {
+  /**
+   * A RecordWriter that writes the reduce output to a SQL table
+   */
+  protected static class DatabaseRecordWriter implements
+      RecordWriter<Text, MapWritable> {
+    private Connection connection;
+
+    private DatabaseField[] fields;
+
+    private PreparedStatement statement;
+
+    protected DatabaseRecordWriter(Connection connection,
+	PreparedStatement statement, DatabaseField[] fields) {
+      this.connection = connection;
+      this.statement = statement;
+      this.fields = fields;
+    }
+
+    /** {@inheritDoc} */
+    public void close(Reporter reporter) throws IOException {
+      try {
+	statement.close();
+	connection.close();
+      } catch (SQLException e) {
+	throw new IOException(e.getMessage());
+      }
+    }
+
+    /** {@inheritDoc} */
+    public void write(Text key, MapWritable value) throws IOException {
+      try {
+	statement.clearParameters();
+	// Set the key value parameter
+	statement.setString(1, key.toString());
+
+	// Dual iterative setting of the parameters (required as the query has
+	// the ON DUPLICATE statement)
+	for (int j = 0; j < 2; j++) {
+	  for (int i = 0; i < fields.length; i++) {
+	    Writable fieldValue = value.get(fields[i].getKey());
+
+	    if (fieldValue == null)
+	      continue;
+
+	    if (fields[i].getType().equals(DatabaseFieldType.INTEGER)) {
+	      statement.setInt((j * fields.length) + i + 2,
+		  ((IntWritable) fieldValue).get());
+	    } else if (fields[i].getType().equals(DatabaseFieldType.LONG)) {
+	      statement.setLong((j * fields.length) + i + 2,
+		  ((LongWritable) fieldValue).get());
+	    } else if (fields[i].getType().equals(DatabaseFieldType.DOUBLE)) {
+	      statement.setDouble((j * fields.length) + i + 2,
+		  ((DoubleWritable) fieldValue).get());
+	    } else if (fields[i].getType().equals(DatabaseFieldType.BINARY)) {
+	      statement.setBinaryStream((j * fields.length) + i + 2,
+		  new ByteArrayInputStream(((BytesWritable) fieldValue).get()),
+		  -1);
+	    } else {
+	      statement.setString((j * fields.length) + i + 2,
+		  ((Text) fieldValue).toString());
+	    }
+	  }
+	}
+
+	// TODO Use the batch mechanism for inserts/updates.
+	statement.executeUpdate();
+      } catch (SQLException e) {
+	e.printStackTrace();
+      }
+    }
+  }
+
+  /**
+   * Constructs the query used as the prepared statement to insert data
+   */
+  protected static String constructQuery(String table, DatabaseField keyField,
+      DatabaseField[] fields) {
+    StringBuilder query = new StringBuilder();
+    query.append("INSERT INTO " + table + " (" + keyField.getName());
+
+    for (int i = 0; i < fields.length; i++) {
+      query.append(", ");
+      query.append(fields[i].getName());
+    }
+
+    query.append(") VALUES (?");
+
+    for (int i = 0; i < fields.length; i++) {
+      query.append(",?");
+    }
+
+    query.append(") ON DUPLICATE KEY UPDATE ");
+
+    for (int i = 0; i < fields.length; i++) {
+      if (i > 0)
+	query.append(", ");
+      query.append(fields[i].getName() + "=?");
+    }
+
+    query.append(";");
+
+    return query.toString();
+  }
+
+  /** {@inheritDoc} */
+  public void checkOutputSpecs(FileSystem filesystem, JobConf job)
+      throws IOException {
+    // TODO Implement
+  }
+
+  /** {@inheritDoc} */
+  public RecordWriter<Text, MapWritable> getRecordWriter(FileSystem filesystem,
+      JobConf job, String name, Progressable progress) throws IOException {
+    try {
+      Class.forName(job.get(DatabaseConfiguration.DRIVER_CLASS_PROPERTY));
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e.getMessage());
+    }
+
+    DatabaseField keyField = DatabaseField.parse(job
+	.get(DatabaseConfiguration.OUTPUT_KEY_FIELD_PROPERTY));
+    DatabaseField[] fields = DatabaseField.deserialize(job
+	.get(DatabaseConfiguration.OUTPUT_FIELDS_PROPERTY));
+
+    Connection connection = null;
+    PreparedStatement statement = null;
+
+    try {
+      connection = DriverManager.getConnection(job
+	  .get(DatabaseConfiguration.URL_PROPERTY), job
+	  .get(DatabaseConfiguration.USERNAME_PROPERTY), job
+	  .get(DatabaseConfiguration.PASSWORD_PROPERTY));
+
+      statement = connection.prepareStatement(constructQuery(job
+	  .get(DatabaseConfiguration.OUTPUT_TABLE_PROPERTY), keyField, fields));
+    } catch (SQLException e) {
+      throw new IOException(e.getMessage());
+    }
+
+    return new DatabaseRecordWriter(connection, statement, fields);
+  }
+
+  /**
+   * Initializes the reduce-part of the job with the appropriate output settings
+   * 
+   * @param job
+   *                The job
+   * @param table
+   *                The table to insert data into
+   * @param keyField
+   *                The field to use as row key (should be a primary key)
+   * @param fields
+   *                The fields to output
+   */
+  public static void setOutput(JobConf job, String table,
+      DatabaseField keyField, DatabaseField[] fields) {
+    job.setOutputFormat(DatabaseOutputFormat.class);
+    job.setReduceSpeculativeExecution(false);
+
+    job.set(DatabaseConfiguration.OUTPUT_TABLE_PROPERTY, table);
+    job.set(DatabaseConfiguration.OUTPUT_KEY_FIELD_PROPERTY, keyField
+	.toString());
+    job.set(DatabaseConfiguration.OUTPUT_FIELDS_PROPERTY, DatabaseField
+	.serialize(fields));
+  }
+}
