Index: src/java/org/apache/nutch/fetcher/Fetcher2.java =================================================================== --- src/java/org/apache/nutch/fetcher/Fetcher2.java (revision 0) +++ src/java/org/apache/nutch/fetcher/Fetcher2.java (revision 0) @@ -0,0 +1,750 @@ +/** + * Copyright 2005 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.fetcher; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.URL; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; + +// Commons Logging imports +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.io.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.mapred.*; + +import org.apache.lucene.util.PriorityQueue; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.SignatureFactory; +import org.apache.nutch.metadata.Metadata; +import org.apache.nutch.net.*; +import org.apache.nutch.protocol.*; +import org.apache.nutch.parse.*; +import org.apache.nutch.scoring.ScoringFilters; +import org.apache.nutch.util.*; + + +/** The fetcher. Most of the work is done by plugins. */ +public class Fetcher2 extends Configured implements MapRunnable { + + public static final Log LOG = LogFactory.getLog(Fetcher2.class); + + public static final String SIGNATURE_KEY = "nutch.content.digest"; + public static final String SEGMENT_NAME_KEY = "nutch.segment.name"; + public static final String SCORE_KEY = "nutch.crawl.score"; + + public static class InputFormat extends SequenceFileInputFormat { + /** Don't split inputs, to keep things polite. */ + public FileSplit[] getSplits(FileSystem fs, JobConf job, int nSplits) + throws IOException { + Path[] files = listPaths(fs, job); + FileSplit[] splits = new FileSplit[files.length]; + for (int i = 0; i < files.length; i++) { + splits[i] = new FileSplit(files[i], 0, fs.getLength(files[i])); + } + return splits; + } + } + + private RecordReader input; + private OutputCollector output; + private Reporter reporter; + + private HashMap queues; + private boolean byIP; + private long serverDelay; + private int queueSize; + private int maxDelays; + private int maxThreadsPerHost; + + /** + * Maps from host to a Long naming the time it should be unblocked. + * The Long is zero while the host is in use, then set to now+wait when + * a request finishes. This way only one thread at a time accesses a + * host. + */ + private static HashMap BLOCKED_ADDR_TO_TIME = new HashMap(); + + /** + * Maps a host to the number of threads accessing that host. + */ + private static HashMap THREADS_PER_HOST_COUNT = new HashMap(); + + /** + * Queue of blocked hosts. This contains all of the non-zero entries + * from BLOCKED_ADDR_TO_TIME, ordered by increasing time. + */ + private static LinkedList BLOCKED_ADDR_QUEUE = new LinkedList(); + + + private String segmentName; + private int activeThreads; + private int maxRedirect; + + private long start = System.currentTimeMillis(); // start time of fetcher run + private long lastRequestStart = start; + + private long bytes; // total bytes fetched + private int pages; // total pages fetched + private int errors; // total pages errored + + private boolean storingContent; + private boolean parsing; + + private class FetcherThread extends Thread { + private Configuration conf; + private URLFilters urlFilters; + private ScoringFilters scfilters; + private ParseUtil parseUtil; + private UrlNormalizer normalizer; + private ProtocolFactory protocolFactory; + private boolean hasMoreInput = true; + + public FetcherThread(Configuration conf) { + this.setDaemon(true); // don't hang JVM on exit + this.setName("FetcherThread"); // use an informative name + this.conf = conf; + this.urlFilters = new URLFilters(conf); + this.scfilters = new ScoringFilters(conf); + this.parseUtil = new ParseUtil(conf); + this.protocolFactory = new ProtocolFactory(conf); + this.normalizer = new UrlNormalizerFactory(conf).getNormalizer(); + } + + public void run() { + synchronized (Fetcher2.this) {activeThreads++;} // count threads + + try { + UTF8 key = new UTF8(); + CrawlDatum datum = new CrawlDatum(); + + while (true) { + // TODO : NUTCH-258 ... + // If something bad happened, then exit + // if (conf.getBoolean("fetcher.exit", false)) { + // break; + // ] + + // check the queues + if (!getFromQueue(key, datum, false)) { + try { // get next entry from input + if (!hasMoreInput || !input.next(key, datum)) { + hasMoreInput = false; + if (getQueueSize() > 0) { + try { + Thread.sleep(serverDelay); + } catch (Exception e) {} + continue; + } else { + break; // at eof, exit + } + } + } catch (IOException e) { + if (LOG.isFatalEnabled()) { + e.printStackTrace(LogUtil.getFatalStream(LOG)); + LOG.fatal("fetcher caught:"+e.toString()); + } + break; + } + } + + synchronized (Fetcher2.this) { + lastRequestStart = System.currentTimeMillis(); + } + + // url may be changed through redirects. + UTF8 url = new UTF8(); + url.set(key); + try { + if (LOG.isInfoEnabled()) { LOG.info("fetching " + url); } + + // fetch the page + boolean redirecting; + int redirectCount = 0; + do { + if (LOG.isDebugEnabled()) { + LOG.debug("redirectCount=" + redirectCount); + } + redirecting = false; + URL u = null; + try { + u = new URL(url.toString()); + } catch (Exception e) { + LOG.warn("Cannot parse url: " + url, e); + continue; + } + String proto = u.getProtocol().toLowerCase(); + String host; + if (byIP) { + try { + InetAddress addr = InetAddress.getByName(u.getHost()); + host = addr.getHostAddress(); + } catch (UnknownHostException e) { + // unable to resolve it, so don't fall back to host name + throw new Exception(e); + } + } else { + host = u.getHost(); + if (host == null) + throw new Exception("Unknown host for url: " + url); + } + host = host.toLowerCase(); + String protoHost = proto + "://" + host; + Protocol protocol = this.protocolFactory.getProtocol(url.toString()); + RobotRules rules = protocol.getRobotRules(url, datum); + if (!rules.isAllowed(u)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Denied by robots.txt: " + url); + } + output(url, datum, null, CrawlDatum.STATUS_FETCH_GONE); + continue; + } + long delay = serverDelay; + if (rules.getCrawlDelay() > 0) delay = rules.getCrawlDelay(); + // block address + long wait = blockAddr(protoHost, delay); + if (wait > 0) { + putToQueue(protoHost, url, datum, wait); + continue; + } + ProtocolOutput output = protocol.getProtocolOutput(url, datum); + ProtocolStatus status = output.getStatus(); + Content content = output.getContent(); + ParseStatus pstatus = null; + unblockAddr(protoHost, delay); + + switch(status.getCode()) { + + case ProtocolStatus.WOULDBLOCK: + putToQueue(protoHost, url, datum, status); + break; + + case ProtocolStatus.SUCCESS: // got a page + pstatus = output(url, datum, content, CrawlDatum.STATUS_FETCH_SUCCESS); + updateStatus(content.getContent().length); + if (pstatus != null && pstatus.isSuccess() && + pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) { + String newUrl = pstatus.getMessage(); + newUrl = normalizer.normalize(newUrl); + newUrl = this.urlFilters.filter(newUrl); + if (newUrl != null && !newUrl.equals(url.toString())) { + url = new UTF8(newUrl); + redirecting = true; + redirectCount++; + if (LOG.isDebugEnabled()) { + LOG.debug(" - content redirect to " + url); + } + } else if (LOG.isDebugEnabled()) { + LOG.debug(" - content redirect skipped: " + + (newUrl != null ? "to same url" : "filtered")); + } + } + break; + + case ProtocolStatus.MOVED: // redirect + case ProtocolStatus.TEMP_MOVED: + String newUrl = status.getMessage(); + newUrl = normalizer.normalize(newUrl); + newUrl = this.urlFilters.filter(newUrl); + if (newUrl != null && !newUrl.equals(url.toString())) { + url = new UTF8(newUrl); + redirecting = true; + redirectCount++; + if (LOG.isDebugEnabled()) { + LOG.debug(" - protocol redirect to " + url); + } + } else if (LOG.isDebugEnabled()) { + LOG.debug(" - protocol redirect skipped: " + + (newUrl != null ? "to same url" : "filtered")); + } + break; + + case ProtocolStatus.EXCEPTION: + logError(url, status.getMessage()); + case ProtocolStatus.RETRY: // retry + datum.setRetriesSinceFetch(datum.getRetriesSinceFetch()+1); + output(url, datum, null, CrawlDatum.STATUS_FETCH_RETRY); + break; + + case ProtocolStatus.GONE: // gone + case ProtocolStatus.NOTFOUND: + case ProtocolStatus.ACCESS_DENIED: + case ProtocolStatus.ROBOTS_DENIED: + case ProtocolStatus.NOTMODIFIED: + output(url, datum, null, CrawlDatum.STATUS_FETCH_GONE); + break; + + default: + if (LOG.isWarnEnabled()) { + LOG.warn("Unknown ProtocolStatus: " + status.getCode()); + } + output(url, datum, null, CrawlDatum.STATUS_FETCH_GONE); + } + + if (redirecting && redirectCount >= maxRedirect) { + if (LOG.isInfoEnabled()) { + LOG.info(" - redirect count exceeded " + url); + } + output(url, datum, null, CrawlDatum.STATUS_FETCH_GONE); + } + + } while (redirecting && (redirectCount < maxRedirect)); + + + } catch (Throwable t) { // unexpected exception + logError(url, t.toString()); + output(url, datum, null, CrawlDatum.STATUS_FETCH_RETRY); + + } + } + + } catch (Throwable e) { + if (LOG.isFatalEnabled()) { + e.printStackTrace(LogUtil.getFatalStream(LOG)); + LOG.fatal("fetcher caught:"+e.toString()); + } + } finally { + synchronized (Fetcher2.this) {activeThreads--;} // count threads + } + } + + private void logError(UTF8 url, String message) { + if (LOG.isInfoEnabled()) { + LOG.info("fetch of " + url + " failed with: " + message); + } + synchronized (Fetcher2.this) { // record failure + errors++; + } + } + + private ParseStatus output(UTF8 key, CrawlDatum datum, + Content content, int status) { + + datum.setStatus(status); + datum.setFetchTime(System.currentTimeMillis()); + + if (content == null) { + String url = key.toString(); + content = new Content(url, url, new byte[0], "", new Metadata(), this.conf); + } + Metadata metadata = content.getMetadata(); + // add segment to metadata + metadata.set(SEGMENT_NAME_KEY, segmentName); + // add score to content metadata so that ParseSegment can pick it up. + try { + scfilters.passScoreBeforeParsing(key, datum, content); + } catch (Exception e) { + if (LOG.isWarnEnabled()) { + e.printStackTrace(LogUtil.getWarnStream(LOG)); + LOG.warn("Couldn't pass score, url " + key + " (" + e + ")"); + } + } + + Parse parse = null; + if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) { + ParseStatus parseStatus; + try { + parse = this.parseUtil.parse(content); + parseStatus = parse.getData().getStatus(); + } catch (Exception e) { + parseStatus = new ParseStatus(e); + } + if (!parseStatus.isSuccess()) { + if (LOG.isWarnEnabled()) { + LOG.warn("Error parsing: " + key + ": " + parseStatus); + } + parse = parseStatus.getEmptyParse(getConf()); + } + // Calculate page signature. For non-parsing fetchers this will + // be done in ParseSegment + byte[] signature = SignatureFactory.getSignature(getConf()).calculate(content, parse); + metadata.set(SIGNATURE_KEY, StringUtil.toHexString(signature)); + datum.setSignature(signature); + // Ensure segment name and score are in parseData metadata + parse.getData().getContentMeta().set(SEGMENT_NAME_KEY, segmentName); + parse.getData().getContentMeta().set(SIGNATURE_KEY, StringUtil.toHexString(signature)); + try { + scfilters.passScoreAfterParsing(key, content, parse); + } catch (Exception e) { + if (LOG.isWarnEnabled()) { + e.printStackTrace(LogUtil.getWarnStream(LOG)); + LOG.warn("Couldn't pass score, url " + key + " (" + e + ")"); + } + } + + } + + try { + output.collect + (key, + new FetcherOutput(datum, + storingContent ? content : null, + parse != null ? new ParseImpl(parse) : null)); + } catch (IOException e) { + if (LOG.isFatalEnabled()) { + e.printStackTrace(LogUtil.getFatalStream(LOG)); + LOG.fatal("fetcher caught:"+e.toString()); + } + } + if (parse != null) return parse.getData().getStatus(); + else return null; + } + + } + + public Fetcher2() { super(null); } + + public Fetcher2(Configuration conf) { super(conf); } + + private synchronized void updateStatus(int bytesInPage) throws IOException { + pages++; + bytes += bytesInPage; + } + + /** + * Attempts to block a combination of protocol + host (name or IP, + * depending on the config), taking into account the specified crawl delay + * between successive requests, and maximum number of concurrent requests. + * + * @param protoHost protocol + host (e.g. "http://www.example.com") + * @param crawlDelay number of milliseconds between consecutive requests + * @return if this returns 0, then address is blocked and ready to be + * fetched from (and needs to be unblocked later). If non-zero, then this + * value is the time when next attempt should be made. + * @throws ProtocolException + */ + private long blockAddr(String protoHost, long crawlDelay) throws ProtocolException { + + cleanExpiredServerBlocks(); // free held addresses + + Long time; + synchronized (BLOCKED_ADDR_TO_TIME) { + time = (Long) BLOCKED_ADDR_TO_TIME.get(protoHost); + if (time == null) { // address is free + + // get # of threads already accessing this addr + Integer counter = (Integer)THREADS_PER_HOST_COUNT.get(protoHost); + int count = (counter == null) ? 0 : counter.intValue(); + + count++; // increment & store + THREADS_PER_HOST_COUNT.put(protoHost, new Integer(count)); + + if (count >= maxThreadsPerHost) { + BLOCKED_ADDR_TO_TIME.put(protoHost, new Long(0)); // block it + } + return 0; + } + } + + long done = time.longValue(); + long now = System.currentTimeMillis(); + long sleep = 0; + if (done == 0) { // address is still in use + sleep = crawlDelay; // wait at least delay + + } else if (now < done) { // address is on hold + sleep = done - now; // wait until its free + } + + return System.currentTimeMillis() + sleep; + } + + private void unblockAddr(String host, long crawlDelay) { + synchronized (BLOCKED_ADDR_TO_TIME) { + int addrCount = ((Integer)THREADS_PER_HOST_COUNT.get(host)).intValue(); + if (addrCount == 1) { + THREADS_PER_HOST_COUNT.remove(host); + BLOCKED_ADDR_QUEUE.addFirst(host); + BLOCKED_ADDR_TO_TIME.put + (host, new Long(System.currentTimeMillis() + crawlDelay)); + } else { + THREADS_PER_HOST_COUNT.put(host, new Integer(addrCount - 1)); + } + } + } + + private static void cleanExpiredServerBlocks() { + synchronized (BLOCKED_ADDR_TO_TIME) { + while (!BLOCKED_ADDR_QUEUE.isEmpty()) { + String host = (String) BLOCKED_ADDR_QUEUE.getLast(); + long time = ((Long) BLOCKED_ADDR_TO_TIME.get(host)).longValue(); + if (time <= System.currentTimeMillis()) { + BLOCKED_ADDR_TO_TIME.remove(host); + BLOCKED_ADDR_QUEUE.removeLast(); + } + } + } + } + + private synchronized void putToQueue(String protoHost, UTF8 u, CrawlDatum datum, + ProtocolStatus status) throws Exception { + long fetchTime = System.currentTimeMillis(); + if (status == null) { + fetchTime += serverDelay; + } else { + String msg = status.getMessage(); + if (msg != null) { + try { + fetchTime = Long.parseLong(msg); + } catch (Exception e) { + fetchTime += serverDelay; + } + } else fetchTime += serverDelay; + } + putToQueue(protoHost, u, datum, fetchTime); + } + + private synchronized void putToQueue(String protoHost, UTF8 u, CrawlDatum datum, + long fetchTime) throws Exception { + HostQueue hostQueue = (HostQueue)queues.get(protoHost); + if (hostQueue == null) { + hostQueue = new HostQueue(500); + queues.put(protoHost, hostQueue); + } + HostQueueEntry hqe = new HostQueueEntry(u, datum, protoHost, fetchTime); + hostQueue.insert(hqe); + queueSize++; + LOG.info("PUT: protoHost=" + protoHost + ", url=" + u + ", size=" + queueSize); + } + + private static class HostQueueEntry { + UTF8 url; + String protoHost; + CrawlDatum datum; + long fetchTime; + + public HostQueueEntry(UTF8 url, CrawlDatum datum, String protoHost, long fetchTime) { + this.url = url; + this.datum = datum; + this.fetchTime = fetchTime; + this.protoHost = protoHost; + } + } + + private static class HostQueue extends PriorityQueue { + + public HostQueue(int size) { + initialize(size); + } + + protected boolean lessThan(Object o1, Object o2) { + HostQueueEntry hqe1 = (HostQueueEntry)o1; + HostQueueEntry hqe2 = (HostQueueEntry)o2; + return hqe1.fetchTime < hqe2.fetchTime; + } + + } + + private synchronized boolean getFromQueue(UTF8 key, CrawlDatum datum, boolean asap) { + if (getQueueSize() == 0) return false; + // get the tops + Iterator it = queues.keySet().iterator(); + HostQueue ghq = new HostQueue(queues.size()); + while (it.hasNext()) { + String protoHost = (String)it.next(); + HostQueue hq = (HostQueue)queues.get(protoHost); + HostQueueEntry hqe = (HostQueueEntry)hq.top(); + if (hqe != null) ghq.insert(hqe); + } + if (ghq.size() == 0) { + LOG.info("GET: size=" + queueSize + " but tops count == 0"); + return false; + } + HostQueueEntry hqe = (HostQueueEntry)ghq.pop(); + if (asap) { + key.set(hqe.url); + datum.set(hqe.datum); + HostQueue hq = (HostQueue)queues.get(hqe.protoHost); + hq.pop(); + queueSize--; + LOG.info("GET: protoHost=" + hqe.protoHost + ", url=" + hqe.url + ", size=" + queueSize); + return true; + } else { + if (hqe.fetchTime < System.currentTimeMillis()) { + key.set(hqe.url); + datum.set(hqe.datum); + HostQueue hq = (HostQueue)queues.get(hqe.protoHost); + hq.pop(); + queueSize--; + LOG.info("GET: protoHost=" + hqe.protoHost + ", url=" + hqe.url + ", size=" + queueSize); + return true; + } else { + LOG.info("GET: not yet, protoHost=" + hqe.protoHost + ", url=" + hqe.url + ", size=" + queueSize); + return false; + } + } + } + + private synchronized int getQueueSize() { + return queueSize; + } + + private void reportStatus() throws IOException { + String status; + synchronized (this) { + long elapsed = (System.currentTimeMillis() - start)/1000; + status = + pages+" pages, "+errors+" errors, " + + Math.round(((float)pages*10)/elapsed)/10.0+" pages/s, " + + Math.round(((((float)bytes)*8)/1024)/elapsed)+" kb/s, "; + } + reporter.setStatus(status); + } + + public void configure(JobConf job) { + setConf(job); + + this.segmentName = job.get(SEGMENT_NAME_KEY); + this.storingContent = isStoringContent(job); + this.parsing = isParsing(job); + this.queues = new HashMap(); + this.queueSize = 0; + this.maxDelays = job.getInt("http.max.delays", 3); + this.maxThreadsPerHost = job.getInt("fetcher.threads.per.host", 1); + // backward-compatible default setting + this.byIP = job.getBoolean("fetcher.threads.per.host.by.ip", true); + this.serverDelay = (long) (job.getFloat("fetcher.server.delay", 1.0f) * 1000); + +// if (job.getBoolean("fetcher.verbose", false)) { +// LOG.setLevel(Level.FINE); +// } + } + + public void close() {} + + public static boolean isParsing(Configuration conf) { + return conf.getBoolean("fetcher.parse", true); + } + + public static boolean isStoringContent(Configuration conf) { + return conf.getBoolean("fetcher.store.content", true); + } + + public void run(RecordReader input, OutputCollector output, + Reporter reporter) throws IOException { + + this.input = input; + this.output = output; + this.reporter = reporter; + + this.maxRedirect = getConf().getInt("http.redirect.max", 3); + + int threadCount = getConf().getInt("fetcher.threads.fetch", 10); + if (LOG.isInfoEnabled()) { LOG.info("Fetcher: threads: " + threadCount); } + + for (int i = 0; i < threadCount; i++) { // spawn threads + new FetcherThread(getConf()).start(); + } + + // select a timeout that avoids a task timeout + long timeout = getConf().getInt("mapred.task.timeout", 10*60*1000)/2; + + do { // wait for threads to exit + try { + Thread.sleep(1000); + } catch (InterruptedException e) {} + + reportStatus(); + + // some requests seem to hang, despite all intentions + synchronized (this) { + if ((System.currentTimeMillis() - lastRequestStart) > timeout) { + if (LOG.isWarnEnabled()) { + LOG.warn("Aborting with "+activeThreads+" hung threads."); + } + return; + } + } + + } while (activeThreads > 0); + + } + + public void fetch(Path segment, int threads, boolean parsing) + throws IOException { + + if (LOG.isInfoEnabled()) { + LOG.info("Fetcher: starting"); + LOG.info("Fetcher: segment: " + segment); + } + + JobConf job = new NutchJob(getConf()); + job.setJobName("fetch " + segment); + + job.setInt("fetcher.threads.fetch", threads); + job.set(SEGMENT_NAME_KEY, segment.getName()); + job.setBoolean("fetcher.parse", parsing); + + // for politeness, don't permit parallel execution of a single task + job.setSpeculativeExecution(false); + + job.setInputPath(new Path(segment, CrawlDatum.GENERATE_DIR_NAME)); + job.setInputFormat(InputFormat.class); + job.setInputKeyClass(UTF8.class); + job.setInputValueClass(CrawlDatum.class); + + job.setMapRunnerClass(Fetcher2.class); + + job.setOutputPath(segment); + job.setOutputFormat(FetcherOutputFormat.class); + job.setOutputKeyClass(UTF8.class); + job.setOutputValueClass(FetcherOutput.class); + + JobClient.runJob(job); + if (LOG.isInfoEnabled()) { LOG.info("Fetcher: done"); } + } + + + /** Run the fetcher. */ + public static void main(String[] args) throws Exception { + + String usage = "Usage: Fetcher [-threads n] [-noParsing]"; + + if (args.length < 1) { + System.err.println(usage); + System.exit(-1); + } + + Path segment = new Path(args[0]); + + Configuration conf = NutchConfiguration.create(); + + int threads = conf.getInt("fetcher.threads.fetch", 10); + boolean parsing = true; + + for (int i = 1; i < args.length; i++) { // parse command line + if (args[i].equals("-threads")) { // found -threads option + threads = Integer.parseInt(args[++i]); + } else if (args[i].equals("-noParsing")) parsing = false; + } + + conf.setInt("fetcher.threads.fetch", threads); + if (!parsing) { + conf.setBoolean("fetcher.parse", parsing); + } + Fetcher2 fetcher = new Fetcher2(conf); // make a Fetcher + + fetcher.fetch(segment, threads, parsing); // run the Fetcher + + } +} Property changes on: src/java/org/apache/nutch/fetcher/Fetcher2.java ___________________________________________________________________ Name: svn:eol-style + native Index: src/java/org/apache/nutch/protocol/EmptyRobotRules.java =================================================================== --- src/java/org/apache/nutch/protocol/EmptyRobotRules.java (revision 0) +++ src/java/org/apache/nutch/protocol/EmptyRobotRules.java (revision 0) @@ -0,0 +1,26 @@ +/* + * Created on Aug 4, 2006 + * Author: Andrzej Bialecki <ab@getopt.org> + * + */ +package org.apache.nutch.protocol; + +import java.net.URL; + +public class EmptyRobotRules implements RobotRules { + + public static final RobotRules RULES = new EmptyRobotRules(); + + public long getCrawlDelay() { + return -1; + } + + public long getExpireTime() { + return -1; + } + + public boolean isAllowed(URL url) { + return true; + } + +} Property changes on: src/java/org/apache/nutch/protocol/EmptyRobotRules.java ___________________________________________________________________ Name: svn:eol-style + native Index: src/java/org/apache/nutch/protocol/RobotRules.java =================================================================== --- src/java/org/apache/nutch/protocol/RobotRules.java (revision 0) +++ src/java/org/apache/nutch/protocol/RobotRules.java (revision 0) @@ -0,0 +1,29 @@ + +package org.apache.nutch.protocol; + +import java.net.URL; + + +/** + * This class holds the rules which were parsed from a robots.txt file, and can + * test paths against those rules. + */ +public interface RobotRules { + /** + * Get expire time + */ + public long getExpireTime(); + + /** + * Get Crawl-Delay, in milliseconds. This returns -1 if not set. + */ + public long getCrawlDelay(); + + /** + * Returns false if the robots.txt file + * prohibits us from accessing the given url, or + * true otherwise. + */ + public boolean isAllowed(URL url); + +} Property changes on: src/java/org/apache/nutch/protocol/RobotRules.java ___________________________________________________________________ Name: svn:eol-style + native Index: src/java/org/apache/nutch/protocol/Protocol.java =================================================================== --- src/java/org/apache/nutch/protocol/Protocol.java (revision 425684) +++ src/java/org/apache/nutch/protocol/Protocol.java (working copy) @@ -30,6 +30,14 @@ /** The name of the extension point. */ public final static String X_POINT_ID = Protocol.class.getName(); + /** + * Retrieve robot rules applicable for this url. + * @param url url to check + * @param datum page datum + * @return robot rules (specific for this url or default), never null + */ + RobotRules getRobotRules(UTF8 url, CrawlDatum datum); + /** Returns the {@link Content} for a fetchlist entry. */ ProtocolOutput getProtocolOutput(UTF8 url, CrawlDatum datum); Index: src/java/org/apache/nutch/protocol/ProtocolStatus.java =================================================================== --- src/java/org/apache/nutch/protocol/ProtocolStatus.java (revision 425684) +++ src/java/org/apache/nutch/protocol/ProtocolStatus.java (working copy) @@ -60,6 +60,10 @@ public static final int NOTFETCHING = 20; /** Unchanged since the last fetch. */ public static final int NOTMODIFIED = 21; + /** Request was refused by protocol plugins, because it would block. + * The expected number of milliseconds to wait before retry may be provided + * in args. */ + public static final int WOULDBLOCK = 22; // Useful static instances for status codes that don't usually require any // additional arguments. @@ -93,6 +97,7 @@ codeToName.put(new Integer(REDIR_EXCEEDED), "redir_exceeded"); codeToName.put(new Integer(NOTFETCHING), "notfetching"); codeToName.put(new Integer(NOTMODIFIED), "notmodified"); + codeToName.put(new Integer(WOULDBLOCK), "wouldblock"); } public ProtocolStatus() { Index: src/plugin/protocol-file/src/java/org/apache/nutch/protocol/file/File.java =================================================================== --- src/plugin/protocol-file/src/java/org/apache/nutch/protocol/file/File.java (revision 425684) +++ src/plugin/protocol-file/src/java/org/apache/nutch/protocol/file/File.java (working copy) @@ -28,9 +28,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.nutch.protocol.Content; +import org.apache.nutch.protocol.EmptyRobotRules; import org.apache.nutch.protocol.Protocol; import org.apache.nutch.protocol.ProtocolOutput; import org.apache.nutch.protocol.ProtocolStatus; +import org.apache.nutch.protocol.RobotRules; import java.net.URL; @@ -163,4 +165,8 @@ public Configuration getConf() { return this.conf; } + + public RobotRules getRobotRules(UTF8 url, CrawlDatum datum) { + return EmptyRobotRules.RULES; + } } Index: src/plugin/lib-http/src/java/org/apache/nutch/protocol/http/api/HttpBase.java =================================================================== --- src/plugin/lib-http/src/java/org/apache/nutch/protocol/http/api/HttpBase.java (revision 425684) +++ src/plugin/lib-http/src/java/org/apache/nutch/protocol/http/api/HttpBase.java (working copy) @@ -35,6 +35,7 @@ import org.apache.nutch.protocol.ProtocolException; import org.apache.nutch.protocol.ProtocolOutput; import org.apache.nutch.protocol.ProtocolStatus; +import org.apache.nutch.protocol.RobotRules; import org.apache.nutch.util.GZIPUtils; import org.apache.nutch.util.LogUtil; @@ -70,21 +71,6 @@ /** The length limit for downloaded content, in bytes. */ protected int maxContent = 64 * 1024; - /** The number of times a thread will delay when trying to fetch a page. */ - protected int maxDelays = 3; - - /** - * The maximum number of threads that should be allowed - * to access a host at one time. - */ - protected int maxThreadsPerHost = 1; - - /** - * The number of seconds the fetcher will delay between - * successive requests to the same server. - */ - protected long serverDelay = 1000; - /** The Nutch 'User-Agent' request header */ protected String userAgent = getAgentString( "NutchCVS", null, "Nutch", @@ -92,25 +78,6 @@ "nutch-agent@lucene.apache.org"); - /** - * Maps from host to a Long naming the time it should be unblocked. - * The Long is zero while the host is in use, then set to now+wait when - * a request finishes. This way only one thread at a time accesses a - * host. - */ - private static HashMap BLOCKED_ADDR_TO_TIME = new HashMap(); - - /** - * Maps a host to the number of threads accessing that host. - */ - private static HashMap THREADS_PER_HOST_COUNT = new HashMap(); - - /** - * Queue of blocked hosts. This contains all of the non-zero entries - * from BLOCKED_ADDR_TO_TIME, ordered by increasing time. - */ - private static LinkedList BLOCKED_ADDR_QUEUE = new LinkedList(); - /** The default logger */ private final static Log LOGGER = LogFactory.getLog(HttpBase.class); @@ -120,9 +87,6 @@ /** The nutch configuration */ private Configuration conf = null; - /** Do we block by IP addresses or by hostnames? */ - private boolean byIP = true; - /** Do we use HTTP/1.1? */ protected boolean useHttp11 = false; @@ -147,13 +111,8 @@ this.useProxy = (proxyHost != null && proxyHost.length() > 0); this.timeout = conf.getInt("http.timeout", 10000); this.maxContent = conf.getInt("http.content.limit", 64 * 1024); - this.maxDelays = conf.getInt("http.max.delays", 3); - this.maxThreadsPerHost = conf.getInt("fetcher.threads.per.host", 1); this.userAgent = getAgentString(conf.get("http.agent.name"), conf.get("http.agent.version"), conf .get("http.agent.description"), conf.get("http.agent.url"), conf.get("http.agent.email")); - this.serverDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000); - // backward-compatible default setting - this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", true); this.useHttp11 = conf.getBoolean("http.http11", false); this.robots.setConf(conf); logConf(); @@ -171,28 +130,8 @@ String urlString = url.toString(); try { URL u = new URL(urlString); - - try { - if (!robots.isAllowed(this, u)) { - return new ProtocolOutput(null, new ProtocolStatus(ProtocolStatus.ROBOTS_DENIED, url)); - } - } catch (Throwable e) { - // XXX Maybe bogus: assume this is allowed. - if (logger.isTraceEnabled()) { - logger.trace("Exception checking robot rules for " + url + ": " + e); - } - } - - long crawlDelay = robots.getCrawlDelay(this, u); - long delay = crawlDelay > 0 ? crawlDelay : serverDelay; - String host = blockAddr(u, delay); - Response response; - try { - response = getResponse(u, datum, false); // make a request - } finally { - unblockAddr(host, delay); - } - + + Response response = getResponse(u, datum, false); // make a request int code = response.getCode(); byte[] content = response.getContent(); Content c = new Content(u.toString(), u.toString(), @@ -280,18 +219,6 @@ return maxContent; } - public int getMaxDelays() { - return maxDelays; - } - - public int getMaxThreadsPerHost() { - return maxThreadsPerHost; - } - - public long getServerDelay() { - return serverDelay; - } - public String getUserAgent() { return userAgent; } @@ -300,94 +227,6 @@ return useHttp11; } - private String blockAddr(URL url, long crawlDelay) throws ProtocolException { - - String host; - if (byIP) { - try { - InetAddress addr = InetAddress.getByName(url.getHost()); - host = addr.getHostAddress(); - } catch (UnknownHostException e) { - // unable to resolve it, so don't fall back to host name - throw new HttpException(e); - } - } else { - host = url.getHost(); - if (host == null) - throw new HttpException("Unknown host for url: " + url); - } - host = host.toLowerCase(); - - int delays = 0; - while (true) { - cleanExpiredServerBlocks(); // free held addresses - - Long time; - synchronized (BLOCKED_ADDR_TO_TIME) { - time = (Long) BLOCKED_ADDR_TO_TIME.get(host); - if (time == null) { // address is free - - // get # of threads already accessing this addr - Integer counter = (Integer)THREADS_PER_HOST_COUNT.get(host); - int count = (counter == null) ? 0 : counter.intValue(); - - count++; // increment & store - THREADS_PER_HOST_COUNT.put(host, new Integer(count)); - - if (count >= maxThreadsPerHost) { - BLOCKED_ADDR_TO_TIME.put(host, new Long(0)); // block it - } - return host; - } - } - - if (delays == maxDelays) - throw new HttpException("Exceeded http.max.delays: retry later."); - - long done = time.longValue(); - long now = System.currentTimeMillis(); - long sleep = 0; - if (done == 0) { // address is still in use - sleep = crawlDelay; // wait at least delay - - } else if (now < done) { // address is on hold - sleep = done - now; // wait until its free - } - - try { - Thread.sleep(sleep); - } catch (InterruptedException e) {} - delays++; - } - } - - private void unblockAddr(String host, long crawlDelay) { - synchronized (BLOCKED_ADDR_TO_TIME) { - int addrCount = ((Integer)THREADS_PER_HOST_COUNT.get(host)).intValue(); - if (addrCount == 1) { - THREADS_PER_HOST_COUNT.remove(host); - BLOCKED_ADDR_QUEUE.addFirst(host); - BLOCKED_ADDR_TO_TIME.put - (host, new Long(System.currentTimeMillis() + crawlDelay)); - } else { - THREADS_PER_HOST_COUNT.put(host, new Integer(addrCount - 1)); - } - } - } - - private static void cleanExpiredServerBlocks() { - synchronized (BLOCKED_ADDR_TO_TIME) { - while (!BLOCKED_ADDR_QUEUE.isEmpty()) { - String host = (String) BLOCKED_ADDR_QUEUE.getLast(); - long time = ((Long) BLOCKED_ADDR_TO_TIME.get(host)).longValue(); - if (time <= System.currentTimeMillis()) { - BLOCKED_ADDR_TO_TIME.remove(host); - BLOCKED_ADDR_QUEUE.removeLast(); - } - } - } - } - private static String getAgentString(String agentName, String agentVersion, String agentDesc, @@ -440,8 +279,6 @@ logger.info("http.timeout = " + timeout); logger.info("http.content.limit = " + maxContent); logger.info("http.agent = " + userAgent); - logger.info("fetcher.server.delay = " + serverDelay); - logger.info("http.max.delays = " + maxDelays); } } @@ -510,4 +347,8 @@ boolean followRedirects) throws ProtocolException, IOException; + public RobotRules getRobotRules(UTF8 url, CrawlDatum datum) { + return this.robots.getRobotRulesSet(this, url); + } + } Index: src/plugin/lib-http/src/java/org/apache/nutch/protocol/http/api/RobotRulesParser.java =================================================================== --- src/plugin/lib-http/src/java/org/apache/nutch/protocol/http/api/RobotRulesParser.java (revision 425684) +++ src/plugin/lib-http/src/java/org/apache/nutch/protocol/http/api/RobotRulesParser.java (working copy) @@ -33,9 +33,11 @@ // Nutch imports import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.io.UTF8; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.net.protocols.Response; import org.apache.nutch.protocol.ProtocolException; +import org.apache.nutch.protocol.RobotRules; /** @@ -69,7 +71,7 @@ * This class holds the rules which were parsed from a robots.txt * file, and can test paths against those rules. */ - public static class RobotRuleSet { + public static class RobotRuleSet implements RobotRules { ArrayList tmpEntries = new ArrayList(); RobotsEntry[] entries = null; long expireTime; @@ -143,10 +145,18 @@ /** * Returns false if the robots.txt file - * prohibits us from accessing the given path, or + * prohibits us from accessing the given url, or * true otherwise. */ - public boolean isAllowed(String path) { + public boolean isAllowed(URL url) { + String path = url.getPath(); // check rules + if ((path == null) || "".equals(path)) { + path= "/"; + } + return isAllowed(path); + } + + boolean isAllowed(String path) { try { path= URLDecoder.decode(path, CHARACTER_ENCODING); } catch (Exception e) { @@ -414,7 +424,17 @@ return rules; } - private RobotRuleSet getRobotRulesSet(HttpBase http, URL url) { + public RobotRuleSet getRobotRulesSet(HttpBase http, UTF8 url) { + URL u = null; + try { + u = new URL(url.toString()); + } catch (Exception e) { + return EMPTY_RULES; + } + return getRobotRulesSet(http, u); + } + + public RobotRuleSet getRobotRulesSet(HttpBase http, URL url) { String host = url.getHost().toLowerCase(); // normalize to lower case @@ -446,12 +466,8 @@ public boolean isAllowed(HttpBase http, URL url) throws ProtocolException, IOException { - String path = url.getPath(); // check rules - if ((path == null) || "".equals(path)) { - path= "/"; - } - return getRobotRulesSet(http, url).isAllowed(path); + return getRobotRulesSet(http, url).isAllowed(url); } public long getCrawlDelay(HttpBase http, URL url) Index: src/plugin/protocol-ftp/src/java/org/apache/nutch/protocol/ftp/Ftp.java =================================================================== --- src/plugin/protocol-ftp/src/java/org/apache/nutch/protocol/ftp/Ftp.java (revision 425684) +++ src/plugin/protocol-ftp/src/java/org/apache/nutch/protocol/ftp/Ftp.java (working copy) @@ -29,9 +29,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.nutch.protocol.Content; +import org.apache.nutch.protocol.EmptyRobotRules; import org.apache.nutch.protocol.Protocol; import org.apache.nutch.protocol.ProtocolOutput; import org.apache.nutch.protocol.ProtocolStatus; +import org.apache.nutch.protocol.RobotRules; import java.net.URL; @@ -237,4 +239,8 @@ return this.conf; } + public RobotRules getRobotRules(UTF8 url, CrawlDatum datum) { + return EmptyRobotRules.RULES; + } + } Index: src/plugin/protocol-httpclient/src/java/org/apache/nutch/protocol/httpclient/Http.java =================================================================== --- src/plugin/protocol-httpclient/src/java/org/apache/nutch/protocol/httpclient/Http.java (revision 425684) +++ src/plugin/protocol-httpclient/src/java/org/apache/nutch/protocol/httpclient/Http.java (working copy) @@ -110,11 +110,6 @@ params.setSendBufferSize(BUFFER_SIZE); params.setReceiveBufferSize(BUFFER_SIZE); params.setMaxTotalConnections(maxThreadsTotal); - if (maxThreadsTotal > maxThreadsPerHost) { - params.setDefaultMaxConnectionsPerHost(maxThreadsPerHost); - } else { - params.setDefaultMaxConnectionsPerHost(maxThreadsTotal); - } HostConfiguration hostConf = client.getHostConfiguration(); ArrayList headers = new ArrayList();