/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.ant.mapred;

import org.apache.tools.ant.BuildException;
import org.apache.tools.ant.Task;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.conf.Configuration;

import java.io.IOException;
import java.io.File;

/**
 * Task to submit a job.
 */

public final class JobSubmitTask extends Task {

  /**
   * URI to the job tracker
   */
  private String jobTracker;

  /**
   * The user
   */
  private String user;


  private TaskDescriptor mapper, reducer, combiner;
  private String jobIdProperty, jobURLProperty;
  private File jar;
  private static final String MAPPER = "mapper";
  private static final String REDUCER = "reducer";
  private static final String COMBINER = "combiner";

  public void setJar(File jar) {
    this.jar = jar;
  }

  public void setUser(String user) {
    this.user = user;
  }

  public void setJobTracker(String jobTracker) {
    this.jobTracker = jobTracker;
  }

  public void addMapper(TaskDescriptor descriptor) {
    checkForOverwrites(MAPPER, mapper);
    mapper = descriptor;
  }

  public void addReducer(TaskDescriptor descriptor) {
    checkForOverwrites(REDUCER, reducer);
    reducer = descriptor;
  }

  public void addCombiner(TaskDescriptor descriptor) {
    checkForOverwrites(COMBINER, combiner);
    combiner = descriptor;
  }

  public void setJobIdProperty(String jobIdProperty) {
    this.jobIdProperty = jobIdProperty;
  }

  public void setJobURLProperty(String jobURLProperty) {
    this.jobURLProperty = jobURLProperty;
  }

  private void checkForOverwrites(String role, TaskDescriptor oldTD)
          throws BuildException {
    if (oldTD != null) {
      throw new BuildException("There already is a " + role + " defined ");
    }
  }

  /**
   * submits a job
   *
   * @throws BuildException if something goes wrong with the build.
   */
  @Override
  public void execute() throws BuildException {

    JobConf conf = new JobConf();
    validateAndSet(MAPPER, conf, "mapred.mapper.class",mapper, true);
    validateAndSet(REDUCER, conf, "mapred.reducer.class", reducer, true);
    validateAndSet(COMBINER, conf, "mapred.combiner.class", combiner, false);
    validateAndSet("job tracker", conf, "mapred.job.tracker", jobTracker);
    if(jar!=null) {
      if(!jar.exists()) {
        throw new BuildException("No JAR file "+jar);
      }
      conf.setJar(jar.getAbsolutePath());
    }

    try {
      JobClient jobClient = new JobClient(conf);
      RunningJob job = jobClient.submitJob(conf);
      JobID id = job.getID();
      String url = job.getTrackingURL();
      log("Submitted job " + id + " tracking URL: " + url);
      setAntProperty(jobIdProperty, id);
      setAntProperty(jobURLProperty, url);
    } catch (IOException e) {
      throw new BuildException("Failed to submit job: " + e, e);
    }
  }


  /**
   * If the name of a property is non-null, set it to the toString() value of
   * the given value parameter
   *
   * @param name  property to set, can be null
   * @param value object to set the property to the string value of
   */
  private void setAntProperty(String name, Object value) {
    if (name != null) {
      getProject().setProperty(name, value.toString());
    }
  }

  /**
   * Validate a value to set in the configuration file; throw a build
   *
   * @param role  role to use in error message
   * @param conf  configuration file to set
   * @param key   key to set in the configuration file
   * @param td task descriptor
   * @param required flag to say this is required
   * @throws BuildException if the validation fails
   */
  private void validateAndSet(String role, JobConf conf, String key,
                              TaskDescriptor td, boolean required) {
    String current = conf.get(key);
    if (td != null) {
      td.validate(role);
      if (current != null) {
        throw new BuildException("The " + role
                + " is attempting to overwrite the setting" + current
                + " with " + td.getClassname());
      }
      conf.set(key, td.getClassname());
    } else {
      if (required) {
        throw new BuildException("Missing " + role);
      }
    }
  }


  /**
   * Validate a value to set in the configuration file; throw a build
   *
   * @param role  role to use in error message
   * @param conf  configuration file to set
   * @param key   key to set in the configuration file
   * @param value value to bind to
   * @throws BuildException if the value is null
   */
  private void validateAndSet(String role, JobConf conf, String key,
                              String value) {
    if (value == null) {
      throw new BuildException("No " + role + " defined");
    }
    conf.set(key, value);
  }

  /**
   * This class describes the class to be used for a specific role in the
   * system. Nested classes are used to allow for feature creep in future,
   * namely specific customisations of configuration options
   */
  public static final class TaskDescriptor {
    private String classname;

    public String getClassname() {
      return classname;
    }

    public void setClassname(String classname) {
      this.classname = classname;
    }

    /**
     * Set a mapping with a check that it is not already defined
     *
     * @param conf configuration file
     * @param key  property to set
     * @throws BuildException if the configuration already has a value for that
     *                        key
     */
    public void set(Configuration conf, String key) {
      String existing = conf.get(key);
      if (existing != null) {
        throw new BuildException(
                "The property " + key + " is already set to "
                        + existing);
      }
    }

    /**
     * @return a string representation of the object.
     */
    @Override
    public String toString() {
      return classname == null ? "(unbound task descriptor)" : classname;
    }

    public void validate(String role) throws BuildException {
      if (classname == null || classname.length() == 0) {
        throw new BuildException(role + " has no class defined");
      }
    }

  }
}
