Index: Indexer.java =================================================================== --- Indexer.java (revision 652713) +++ Indexer.java (working copy) @@ -17,36 +17,53 @@ package org.apache.nutch.indexer; -import java.io.*; -import java.util.*; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - -import org.apache.hadoop.io.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.conf.*; -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.util.*; -import org.apache.nutch.parse.*; -import org.apache.nutch.analysis.*; - -import org.apache.nutch.scoring.ScoringFilterException; -import org.apache.nutch.scoring.ScoringFilters; -import org.apache.nutch.util.LogUtil; -import org.apache.nutch.util.NutchConfiguration; -import org.apache.nutch.util.NutchJob; - +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.CrawlDb; import org.apache.nutch.crawl.Inlinks; import org.apache.nutch.crawl.LinkDb; import org.apache.nutch.crawl.NutchWritable; - -import org.apache.lucene.index.*; -import org.apache.lucene.document.*; +import org.apache.nutch.indexer.lucene.LuceneConstants; +import org.apache.nutch.indexer.lucene.LuceneWriter; +import org.apache.nutch.indexer.solr.SolrConstants; +import org.apache.nutch.indexer.solr.SolrWriter; import org.apache.nutch.metadata.Metadata; import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.parse.Parse; +import org.apache.nutch.parse.ParseData; +import org.apache.nutch.parse.ParseImpl; +import org.apache.nutch.parse.ParseText; +import org.apache.nutch.scoring.ScoringFilterException; +import org.apache.nutch.scoring.ScoringFilters; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; /** Create indexes for segments. */ public class Indexer extends Configured implements Tool, Reducer, Mapper { @@ -54,102 +71,79 @@ public static final String DONE_NAME = "index.done"; public static final Log LOG = LogFactory.getLog(Indexer.class); - - /** A utility class used to pass a lucene document from Indexer.reduce - * to Indexer.OutputFormat. - * Note: Despite its name, it can't properly wrap a lucene document - it - * doesn't know how to serialize/deserialize a lucene document. - */ - private static class LuceneDocumentWrapper implements Writable { - private Document doc; - - public LuceneDocumentWrapper(Document doc) { - this.doc = doc; - } - - public Document get() { - return doc; - } - public void readFields(DataInput in) throws IOException { - // intentionally left blank - } - - public void write(DataOutput out) throws IOException { - // intentionally left blank - } - - } + private static final String LUCENE_ENABLED_KEY = + "indexer.lucene.backend.enabled"; - /** Unwrap Lucene Documents created by reduce and add them to an index. */ + private static final String SOLR_ENABLED_KEY = + "indexer.solr.backend.enabled"; + public static class OutputFormat - extends org.apache.hadoop.mapred.OutputFormatBase { - public RecordWriter getRecordWriter(final FileSystem fs, JobConf job, - String name, final Progressable progress) throws IOException { - final Path perm = new Path(job.getOutputPath(), name); - final Path temp = - job.getLocalPath("index/_"+Integer.toString(new Random().nextInt())); + extends org.apache.hadoop.mapred.OutputFormatBase { + + public RecordWriter getRecordWriter(FileSystem fs, JobConf job, + String name, final Progressable progress) throws IOException { + + final List writers = new ArrayList(); - fs.delete(perm); // delete old, if any + if (job.getBoolean(LUCENE_ENABLED_KEY, false)) { + LuceneWriter.addFieldOptions("segment", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, job); + LuceneWriter.addFieldOptions("digest", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, job); + LuceneWriter.addFieldOptions("boost", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, job); - final AnalyzerFactory factory = new AnalyzerFactory(job); - final IndexWriter writer = // build locally first - new IndexWriter(fs.startLocalOutput(perm, temp).toString(), - new NutchDocumentAnalyzer(job), true); + job.set(LuceneConstants.OUTPUT_DIR, + new Path(job.getOutputPath(), name).toString()); - writer.setMergeFactor(job.getInt("indexer.mergeFactor", 10)); - writer.setMaxBufferedDocs(job.getInt("indexer.minMergeDocs", 100)); - writer.setMaxMergeDocs(job.getInt("indexer.maxMergeDocs", Integer.MAX_VALUE)); - writer.setTermIndexInterval - (job.getInt("indexer.termIndexInterval", 128)); - writer.setMaxFieldLength(job.getInt("indexer.max.tokens", 10000)); - writer.setInfoStream(LogUtil.getInfoStream(LOG)); - writer.setUseCompoundFile(false); - writer.setSimilarity(new NutchSimilarity()); + Path temp = job.getLocalPath("index/_" + + Integer.toString(new Random().nextInt())); + job.set(LuceneConstants.TEMP_OUTPUT_DIR, temp.toString()); - return new RecordWriter() { - boolean closed; + writers.add(new LuceneWriter()); + } - public void write(WritableComparable key, LuceneDocumentWrapper value) - throws IOException { // unwrap & index doc - Document doc = value.get(); - NutchAnalyzer analyzer = factory.get(doc.get("lang")); - if (LOG.isInfoEnabled()) { - LOG.info(" Indexing [" + doc.getField("url").stringValue() + "]" + - " with analyzer " + analyzer + - " (" + doc.get("lang") + ")"); - } - writer.addDocument(doc, analyzer); - progress.progress(); + if (job.getBoolean(SOLR_ENABLED_KEY, false)) { + writers.add(new SolrWriter()); + } + + for (NutchIndexWriter writer : writers) { + writer.open(job); + } + + return new RecordWriter() { + boolean closed; + + public void write(Text key, NutchDocument doc) + throws IOException { + for (NutchIndexWriter writer : writers) { + writer.write(doc); } - - public void close(final Reporter reporter) throws IOException { - // spawn a thread to give progress heartbeats - Thread prog = new Thread() { - public void run() { - while (!closed) { - try { - reporter.setStatus("closing"); - Thread.sleep(1000); - } catch (InterruptedException e) { continue; } - catch (Throwable e) { return; } - } - } - }; + progress.progress(); + } - try { - prog.start(); - if (LOG.isInfoEnabled()) { LOG.info("Optimizing index."); } - // optimize & close index - writer.optimize(); + public void close(final Reporter reporter) throws IOException { + // spawn a thread to give progress heartbeats + Thread prog = new Thread() { + public void run() { + while (!closed) { + try { + reporter.setStatus("closing"); + Thread.sleep(1000); + } catch (InterruptedException e) { continue; } + catch (Throwable e) { return; } + } + } + }; + + try { + prog.start(); + for (NutchIndexWriter writer : writers) { writer.close(); - fs.completeLocalOutput(perm, temp); // copy to dfs - fs.createNewFile(new Path(perm, DONE_NAME)); - } finally { - closed = true; } + } finally { + closed = true; } - }; + } + }; } } @@ -217,26 +211,15 @@ return; } - Document doc = new Document(); + NutchDocument doc = new NutchDocument(); Metadata metadata = parseData.getContentMeta(); // add segment, used to map from merged index back to segment files - doc.add(new Field("segment", metadata.get(Nutch.SEGMENT_NAME_KEY), - Field.Store.YES, Field.Index.NO)); + doc.add("segment", metadata.get(Nutch.SEGMENT_NAME_KEY)); // add digest, used by dedup - doc.add(new Field("digest", metadata.get(Nutch.SIGNATURE_KEY), - Field.Store.YES, Field.Index.NO)); + doc.add("digest", metadata.get(Nutch.SIGNATURE_KEY)); -// if (LOG.isInfoEnabled()) { -// LOG.info("Url: "+key.toString()); -// LOG.info("Title: "+parseData.getTitle()); -// LOG.info(crawlDatum.toString()); -// if (inlinks != null) { -// LOG.info(inlinks.toString()); -// } -// } - Parse parse = new ParseImpl(parseText, parseData); try { // extract information from dbDatum and pass it to @@ -267,35 +250,52 @@ return; } // apply boost to all indexed fields. - doc.setBoost(boost); + doc.setScore(boost); // store boost for use by explain and dedup - doc.add(new Field("boost", Float.toString(boost), - Field.Store.YES, Field.Index.NO)); + doc.add("boost", Float.toString(boost)); - output.collect(key, new LuceneDocumentWrapper(doc)); + output.collect(key, doc); } - public void index(Path indexDir, Path crawlDb, Path linkDb, Path[] segments) - throws IOException { + public void index(Path luceneDir, String solrUrl, Path crawlDb, + Path linkDb, Collection segments) + throws IOException { - if (LOG.isInfoEnabled()) { - LOG.info("Indexer: starting"); - LOG.info("Indexer: linkdb: " + linkDb); - } + LOG.info("Indexer: starting"); + LOG.info("Indexer: crawldb: " + crawlDb); + LOG.info("Indexer: linkdb: " + linkDb); JobConf job = new NutchJob(getConf()); - job.setJobName("index " + indexDir); + String jobName = "index"; + if (luceneDir != null) { + // add lucene output dir to job name + jobName += " lucene=" + luceneDir; + job.setBoolean(LUCENE_ENABLED_KEY, true); + LOG.info("Indexer: luceneDir: " + luceneDir); + } else { + job.setBoolean(LUCENE_ENABLED_KEY, false); + } - for (int i = 0; i < segments.length; i++) { - if (LOG.isInfoEnabled()) { - LOG.info("Indexer: adding segment: " + segments[i]); - } - job.addInputPath(new Path(segments[i], CrawlDatum.FETCH_DIR_NAME)); - job.addInputPath(new Path(segments[i], CrawlDatum.PARSE_DIR_NAME)); - job.addInputPath(new Path(segments[i], ParseData.DIR_NAME)); - job.addInputPath(new Path(segments[i], ParseText.DIR_NAME)); + if (solrUrl != null) { + // add solr server url to job name + jobName += " solr=" + solrUrl; + job.setBoolean(SOLR_ENABLED_KEY, true); + job.set(SolrConstants.SERVER_URL, solrUrl); + LOG.info("Indexer: solrUrl: " + solrUrl); + } else { + job.setBoolean(SOLR_ENABLED_KEY, false); } + job.setJobName(jobName); + + for (Path segment : segments) { + LOG.info("Indexer: adding segment: " + segment); + job.addInputPath(new Path(segment, CrawlDatum.FETCH_DIR_NAME)); + job.addInputPath(new Path(segment, CrawlDatum.PARSE_DIR_NAME)); + job.addInputPath(new Path(segment, ParseData.DIR_NAME)); + job.addInputPath(new Path(segment, ParseText.DIR_NAME)); + } + job.addInputPath(new Path(crawlDb, CrawlDb.CURRENT_NAME)); job.addInputPath(new Path(linkDb, LinkDb.CURRENT_NAME)); job.setInputFormat(SequenceFileInputFormat.class); @@ -303,13 +303,23 @@ job.setMapperClass(Indexer.class); job.setReducerClass(Indexer.class); - job.setOutputPath(indexDir); + Path tempDir = new Path("indexer-" + + new Random().nextInt(Integer.MAX_VALUE)); + if (luceneDir == null) { + job.setOutputPath(tempDir); + } else { + job.setOutputPath(luceneDir); + } job.setOutputFormat(OutputFormat.class); job.setOutputKeyClass(Text.class); - job.setOutputValueClass(NutchWritable.class); + job.setMapOutputValueClass(NutchWritable.class); + job.setOutputValueClass(NutchDocument.class); JobClient.runJob(job); if (LOG.isInfoEnabled()) { LOG.info("Indexer: done"); } + // clean up + FileSystem fs = FileSystem.get(getConf()); + fs.delete(tempDir); } public static void main(String[] args) throws Exception { @@ -319,19 +329,45 @@ public int run(String[] args) throws Exception { - if (args.length < 4) { - System.err.println("Usage: ..."); + if (args.length < 5) { + System.err.println("Usage: (-lucene ) (-solr )" + + " ..."); + return -1; } - Path[] segments = new Path[args.length-3]; - for (int i = 3; i < args.length; i++) { - segments[i-3] = new Path(args[i]); + Path luceneDir = null; + String solrUrl = null; + Path crawlDb = null; + Path linkDb = null; + ArrayList segments = new ArrayList(); + int i; + for (i = 0; i < args.length; i++) { + if (args[i].equals("-lucene")) { + luceneDir = new Path(args[++i]); + } else if (args[i].equals("-solr")) { + solrUrl = args[++i]; + } else { + break; + } } + if (luceneDir == null && solrUrl == null) { + System.err.println("Usage: (-lucene ) (-solr )" + + " ..."); + + return -1; + } + + crawlDb = new Path(args[i++]); + linkDb = new Path(args[i++]); + + for (; i < args.length; i++) { + segments.add(new Path(args[i])); + } + try { - index(new Path(args[0]), new Path(args[1]), new Path(args[2]), - segments); + index(luceneDir, solrUrl, crawlDb, linkDb, segments); return 0; } catch (Exception e) { LOG.fatal("Indexer: " + StringUtils.stringifyException(e));