Index: src/java/org/apache/nutch/fetcher/Fetcher.java
===================================================================
--- src/java/org/apache/nutch/fetcher/Fetcher.java	(revision 449293)
+++ src/java/org/apache/nutch/fetcher/Fetcher.java	(working copy)
@@ -17,6 +17,8 @@
 package org.apache.nutch.fetcher;
 
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
 
 // Commons Logging imports
 import org.apache.commons.logging.Log;
@@ -36,8 +38,16 @@
 import org.apache.nutch.parse.*;
 import org.apache.nutch.scoring.ScoringFilters;
 import org.apache.nutch.util.*;
+import org.apache.nutch.util.msg.Msg;
+import org.apache.nutch.util.msg.MsgQueue;
+import org.apache.nutch.util.msg.MsgQueueException;
+import org.apache.nutch.util.msg.MsgTopic;
+import org.apache.nutch.util.msg.MsgTopicEvent;
+import org.apache.nutch.util.msg.MsgTopicListener;
 
+import sun.security.krb5.internal.ktab.a0;
 
+
 /** The fetcher. Most of the work is done by plugins. */
 public class Fetcher extends ToolBase implements MapRunnable { 
 
@@ -77,6 +87,8 @@
 
   private boolean storingContent;
   private boolean parsing;
+  private MsgQueue mq;
+  private HashSet threads = new HashSet();
 
   private class FetcherThread extends Thread {
     private Configuration conf;
@@ -85,6 +97,7 @@
     private ParseUtil parseUtil;
     private URLNormalizers normalizers;
     private ProtocolFactory protocolFactory;
+    private boolean haltRequested = false;
 
     public FetcherThread(Configuration conf) {
       this.setDaemon(true);                       // don't hang JVM on exit
@@ -110,7 +123,10 @@
           // if (conf.getBoolean("fetcher.exit", false)) {
           //   break;
           // ]
-          
+          if (haltRequested) {
+            LOG.info("HALT requested, quitting...");
+            break;
+          }
           try {                                   // get next entry from input
             if (!input.next(key, datum)) {
               break;                              // at eof, exit
@@ -338,7 +354,7 @@
     String status;
     synchronized (this) {
       long elapsed = (System.currentTimeMillis() - start)/1000;
-      status = 
+      status = activeThreads + " threads, " + 
         pages+" pages, "+errors+" errors, "
         + Math.round(((float)pages*10)/elapsed)/10.0+" pages/s, "
         + Math.round(((((float)bytes)*8)/1024)/elapsed)+" kb/s, ";
@@ -346,6 +362,51 @@
     reporter.setStatus(status);
   }
 
+  private static class ControlListener implements MsgTopicListener {
+    private Fetcher f;
+    
+    public ControlListener(Fetcher f) {
+      this.f = f;
+    }
+
+    public void processEvent(MsgTopic topic, MsgTopicEvent evt) throws Exception {
+      if (evt.getType() == evt.TYPE_MSG_ADDED) {
+        Msg m = topic.getMsg(evt.getMsgId());
+        if (m.getSubject().equals("HALT")) {
+          LOG.info("-> HALT requested!");
+          Iterator it = f.threads.iterator();
+          while (it.hasNext()) {
+            FetcherThread ft = (FetcherThread)it.next();
+            ft.haltRequested = true;
+          }
+        } else if (m.getSubject().equals("THREADS")) {
+          try {
+            int cnt = Integer.parseInt(m.getContent().toString());
+            LOG.info("-> THREADS set to " + cnt);
+            Iterator it = f.threads.iterator();
+            if (cnt < f.activeThreads) {
+              // stop some threads
+              int i = f.activeThreads - cnt;
+              while (it.hasNext() && i-- > 0) {
+                FetcherThread ft = (FetcherThread)it.next();
+                ft.haltRequested = true;
+              }
+            } else {
+              // start some threads
+              for (int i = 0; i < cnt - f.activeThreads; i++) {
+                FetcherThread ft = f.new FetcherThread(f.getConf());
+                ft.start();
+                f.threads.add(ft);
+              }
+            }
+          } catch (Exception e) {
+            LOG.warn("Failed to parse THREADS message: " + StringUtils.stringifyException(e));
+          }
+        }
+      }
+    }
+  }
+  
   public Fetcher() {
     
   }
@@ -364,6 +425,16 @@
 //    if (job.getBoolean("fetcher.verbose", false)) {
 //      LOG.setLevel(Level.FINE);
 //    }
+    try {
+      mq = new MsgQueue(job);
+      MsgTopic ctrl = mq.getMsgTopic(MsgTopic.TOPIC_CONTROL);
+      ctrl.addMsgTopicListener(new ControlListener(this));
+      mq.startPolling();
+    } catch (Exception e) {
+      LOG.warn("Couldn't start MQ: " + StringUtils.stringifyException(e));
+      mq = null;
+    }
+    
   }
 
   public void close() {}
@@ -389,7 +460,9 @@
     if (LOG.isInfoEnabled()) { LOG.info("Fetcher: threads: " + threadCount); }
 
     for (int i = 0; i < threadCount; i++) {       // spawn threads
-      new FetcherThread(getConf()).start();
+      FetcherThread ft = new FetcherThread(getConf());
+      ft.start();
+      threads.add(ft);
     }
 
     // select a timeout that avoids a task timeout
@@ -411,6 +484,11 @@
           return;
         }
       }
+      Iterator it = threads.iterator();
+      while (it.hasNext()) {
+        FetcherThread ft = (FetcherThread)it.next();
+        if (!ft.isAlive()) it.remove();
+      }
 
     } while (activeThreads > 0);
     
