Issue Details (XML | Word | Printable)

Key: NUTCH-368
Type: New Feature New Feature
Status: Closed Closed
Resolution: Won't Fix
Priority: Major Major
Assignee: Andrzej Bialecki
Reporter: Andrzej Bialecki
Votes: 2
Watchers: 3
Operations

If you were logged in you would be able to see more operations.
Nutch

Message queueing system

Created: 15/Sep/06 08:36 PM   Updated: 22/Jan/08 02:48 PM
Return to search
Component/s: None
Affects Version/s: 0.9.0
Fix Version/s: None

Time Tracking:
Not Specified

File Attachments:
  Size
Text File Licensed for inclusion in ASF works Fetcher-ctrl.patch 2006-09-26 09:53 AM Andrzej Bialecki 5 kB
File Licensed for inclusion in ASF works msg.tgz 2006-09-15 08:38 PM Andrzej Bialecki 13 kB
Issue Links:
Reference
 

Resolution Date: 22/Jan/08 02:48 PM


 Description  « Hide
This is an implementation of a filesystem-based message queueing system. The motivation for this functionality is explained in HADOOP-490 - there is nothing Nutch-specific in this implementation, so if it's considered generally useful it could be moved there.

Below are excerpts from the included javadocs.

The model of the system is as follows:

  • applications (including map-reduce jobs) may create their own separate message queueing area. Alternatively, they can specifically ask for a named message queue, belonging to a different application or existing as a system-wide queue. Message queues are created under "/mq" and then the message queue id (for map-reduce jobs this is a job id, or it can be any other name passed as job id to the constructor).
    Please see the example for more information.
  • a single unit of information passing through queues is a Msg, which has a unique identifier (consisting of creation time and publisher name), string subject, and content (Writable).
  • single MsgQueue in fact consists of any number of topics. There are four predefined ones: in, out, err, and ctrl.
  • messages are published to topics, which present a sequential view of messages, sorted by msgId (which corresponds to their order of arrival).
  • each message queue may periodically poll for changes (MsgQueue.startPolling()), using a separate thread. Polling updates the list of topics and messages. Poll interval is configurable, and defaults to 5 sec.
  • each detected change in the queue (add/remove topic, add/remove message) may be communicated to registered listeners. Out-of-band messages are not supported in this version, but it's not too complicated to add them. Applications can create listeners watching queues for newly added messages, or deleted messages, added topics or deleted topics, etc.
  • each instance of MsgQueue using the same physical queue maintains its own view of the queue, keeping track of topics and messages that it considers "processed and discarded". In other words, multiple readers and creators may modify queues, and each knows which messages it already processed and which ones are new. In a similar fashion, instances may willfully "remove" certain topics from their view, even though these topics still physically exist and are available for other instances (and later on they can "add" them to their view again).
    This somewhat complicated feature was implemented in order to support multiple readers for the same message (e.g. many tasks per one mapred job). Each task needs to register for the same queue, and if they didn't have their own views of the queue, messages would be consumed by the first task that got to them. As it is implemented now, each task may consume messages at its own pace. At the end of the job applications may elect to keep the queue around or to destroy it (and thus remove all topics and messages in it).
  • messages, topics and queues may be destroyed by any user, at which point they are physically removed from the filesystem. All users will gradually update their views, during the next poll operation.
  • there is a command-line tool to examine and modify queues, and also to retrieve and send simple text messages. You can run it like this:

bin/nutch org.apache.nutch.util.msg.MsgQueueTool ...many options...



 All   Comments   Work Log   Change History   Subversion Commits      Sort Order: Ascending order - Click to sort in descending order
Andrzej Bialecki added a comment - 15/Sep/06 08:38 PM
Implementation + JUnit tests.

Doug Cutting added a comment - 18/Sep/06 05:32 PM
How would you compare this to JMS?

http://java.sun.com/j2ee/sdk_1.3/techdocs/api/javax/jms/package-summary.html

Is it fundamentally different, primarily a simplification, better integrated w/ Nutch/Hadoop, or what?


Andrzej Bialecki added a comment - 18/Sep/06 06:41 PM
It is modeled after the core concepts in JMS, in the sense that there are topics, queues and messages. Of course it's a simplification, but there are many similarities, so for people familiar with JMS it should also look familiar.

Highlights of JMS vs. this API:

  • this API works now I haven't even looked at implementing a JMS-compliant provider on top of Hadoop, it would be surely much more complicated and take much more time...
  • in JMS there are point-to-point and publish-subscribe communication models. This API is most similar to the pub-sub model, in the sense that each message consumer tracks messages it consumed independently of all other consumers. I selected this model in order to support sending messages to map-reduce jobs, where there are many receivers (tasks) and possibly many senders.
  • the API to browse topics, and select which messages to process (in-order or out-of-order) is IMHO much more natural than the one in JMS API.
  • JMS provides PERSISTENT and NON_PERSISTENT delivery modes. This API provides only a persistent mode; however, at the end of the job applications may elect to remove all queues and their content.
  • JMS supports both blocking and non-blocking reads. This API only supports non-blocking reads.
  • both JMS and this API support event-driven message processing.
  • JMS supports "message selectors" which filter incoming messages according to their properties. This API doesn't support it, but it can be easily added if needed.
  • JMS supports Publishers and Subscribers as separate interfaces. This API doesn't make such distinction - each client connected to a queue may both receive and send messages.
  • JMS supports transaction-oriented communication, this is not supported here.
  • the only persistency mechanism supported now is plain files on Hadoop FileSystem (either local or DFS), and it works transparently, i.e. it doesn't require any special support from Hadoop.
  • messages may contain any Writable, so it's a natural API from the Hadoop's point of view.

So IMHO this gives a fairly large subset of JMS functionality in a simple to understand (and maintain) implementation. Additionally, it doesn't require any modifications in Hadoop, although it could surely use some, to better integrate with map-reduce jobs - e.g. TaskTrackers could be responsible for starting queue sessions for jobs that indicate this; instead of polling for FileSystem updates we could have filesystem monitors, etc ... but this is not strictly necessary, this API works as it is now.


Sami Siren added a comment - 18/Sep/06 07:16 PM
IMO a place for stuff like this is in hadoop more than nutch and i would like to see this implemented there.

Mainly because i see it more as part of distributed architecture (that hadoop is providing) than a search engine specialized functionality (that nutch is providing).

Also have you considered using something readily available instead of implementing (well that part is done allready and taking the burden of maintaining it.


Andrzej Bialecki added a comment - 19/Sep/06 07:18 AM
> IMO a place for stuff like this is in hadoop more than nutch and i would like to see this implemented there.

Agreed. I needed this to support certain Nutch extensions (e.g. gracefully stopping long-running jobs, adjusting bandwidth throttling on a running fetcher, etc), and I didn't want to wait until Nutch catches with that version of Hadoop (if it were ever accepted there).

> Also have you considered using something readily available instead of implementing (well that part is done allready and taking the burden of maintaining it.

I'd gladly do so, however I couldn't find anything like that, which was not at the same time a JMS-compliant stack (with one exception which was GPL-ed). I didn't want to bring the whole weight and complexity of J2EE, and I didn't want to require a separate database for persistence (yet another point of failure).

This API uses the persistance, redundancy, scalability and communication mechanisms of Hadoop, so the most complex parts of JMS I'm getting for free .. the rest is relatively simple.


Andrzej Bialecki added a comment - 26/Sep/06 09:53 AM
This patch uses the message queueing framework to implement the following functionality in Fetcher:
  • ability to gracefully stop fetching the current segment. This is different from simply killing the job in that the partial results (partially fetched segment) are available and can be further processed. This is especially useful for fetching large segments with long "tails", i.e. pages which are fetched very slowly, either because of politeness settings or the target site's bandwidth limitations.
  • ability to dynamicaly adjust the number of fetcher threads. For a long-running fetch job it makes sense to decrease the number of fetcher threads during the day, and increase it during the night. This can be done now with a cron script, using the MsgQueueTool command-line.

It's worthwhile to note that the patch itself is trivial, and most of the work is done by the MQ framework.

After you apply this patch you can start a long-running fetcher job, check its <jobId>, and control the fetcher this way:

bin/nutch org.apache.nutch.util.msg.MsgQueueTool -createMsg <job_id> ctrl THREADS 50

This adjusts the number of threads to 50 (starting more threads or stopping some threads as necessary).

Then run:

bin/nutch org.apache.nutch.util.msg.MsgQueueTool -createMsg <job_id> ctrl HALT

This will gracefully shut down all threads after they finish fetching their current url, and finish the job, keeping the partial segment data intact.


Chris Chiappone added a comment - 15/Jan/08 09:08 PM
I tried to run this patch but im not sure that its works with nutch .9? Is there a way to make this work or any other ways to do this?

Thanks


Andrzej Bialecki added a comment - 22/Jan/08 02:48 PM
This solution is too heavy on the namenode, so it's suitable only for very low message volumes. As such, it's not generally applicable and should not be added to Nutch. See also HADOOP-490.