> entry = iterator.next();
+ String fieldName = entry.getKey();
+
+ Field.Store store = fieldStore.get(fieldName);
+ Field.Index index = fieldIndex.get(fieldName);
+ Field.TermVector vector = fieldVector.get(fieldName);
+
+ // default values
+ if (store == null) {
+ store = Field.Store.NO;
+ }
+
+ if (index == null) {
+ index = Field.Index.NO;
+ }
+
+ if (vector == null) {
+ vector = Field.TermVector.NO;
+ }
+
+ // read document-level field information
+ String[] fieldMetas =
+ documentMeta.getValues(LuceneConstants.FIELD_PREFIX + fieldName);
+ if (fieldMetas.length != 0) {
+ for (String val : fieldMetas) {
+ if (LuceneConstants.STORE_YES.equals(val)) {
+ store = Field.Store.YES;
+ } else if (LuceneConstants.STORE_NO.equals(val)) {
+ store = Field.Store.NO;
+ } else if (LuceneConstants.INDEX_TOKENIZED.equals(val)) {
+ index = Field.Index.TOKENIZED;
+ } else if (LuceneConstants.INDEX_NO.equals(val)) {
+ index = Field.Index.NO;
+ } else if (LuceneConstants.INDEX_UNTOKENIZED.equals(val)) {
+ index = Field.Index.UN_TOKENIZED;
+ } else if (LuceneConstants.INDEX_NO_NORMS.equals(val)) {
+ index = Field.Index.NO_NORMS;
+ } else if (LuceneConstants.VECTOR_NO.equals(val)) {
+ vector = Field.TermVector.NO;
+ } else if (LuceneConstants.VECTOR_YES.equals(val)) {
+ vector = Field.TermVector.YES;
+ } else if (LuceneConstants.VECTOR_POS.equals(val)) {
+ vector = Field.TermVector.WITH_POSITIONS;
+ } else if (LuceneConstants.VECTOR_POS_OFFSET.equals(val)) {
+ vector = Field.TermVector.WITH_POSITIONS_OFFSETS;
+ } else if (LuceneConstants.VECTOR_OFFSET.equals(val)) {
+ vector = Field.TermVector.WITH_OFFSETS;
+ }
+ }
+ }
+
+ for (String fieldValue : entry.getValue()) {
+ out.add(new Field(fieldName, fieldValue, store, index, vector));
+ }
+ }
+
+ return out;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void processOptions(Configuration conf) {
+ Iterator iterator = conf.iterator();
+ while (iterator.hasNext()) {
+ String key = (String) ((Map.Entry)iterator.next()).getKey();
+ if (!key.startsWith(LuceneConstants.LUCENE_PREFIX)) {
+ continue;
+ }
+ if (key.startsWith(LuceneConstants.FIELD_STORE_PREFIX)) {
+ String field =
+ key.substring(LuceneConstants.FIELD_STORE_PREFIX.length());
+ LuceneWriter.STORE store = LuceneWriter.STORE.valueOf(conf.get(key));
+ switch (store) {
+ case YES:
+ fieldStore.put(field, Field.Store.YES);
+ break;
+ case NO:
+ fieldStore.put(field, Field.Store.NO);
+ break;
+ case COMPRESS:
+ fieldStore.put(field, Field.Store.COMPRESS);
+ break;
+ }
+ } else if (key.startsWith(LuceneConstants.FIELD_INDEX_PREFIX)) {
+ String field =
+ key.substring(LuceneConstants.FIELD_INDEX_PREFIX.length());
+ LuceneWriter.INDEX index = LuceneWriter.INDEX.valueOf(conf.get(key));
+ switch (index) {
+ case NO:
+ fieldIndex.put(field, Field.Index.NO);
+ break;
+ case NO_NORMS:
+ fieldIndex.put(field, Field.Index.NO_NORMS);
+ break;
+ case TOKENIZED:
+ fieldIndex.put(field, Field.Index.TOKENIZED);
+ break;
+ case UNTOKENIZED:
+ fieldIndex.put(field, Field.Index.UN_TOKENIZED);
+ break;
+ }
+ } else if (key.startsWith(LuceneConstants.FIELD_VECTOR_PREFIX)) {
+ String field =
+ key.substring(LuceneConstants.FIELD_VECTOR_PREFIX.length());
+ LuceneWriter.VECTOR vector = LuceneWriter.VECTOR.valueOf(conf.get(key));
+ switch (vector) {
+ case NO:
+ fieldVector.put(field, Field.TermVector.NO);
+ break;
+ case OFFSET:
+ fieldVector.put(field, Field.TermVector.WITH_OFFSETS);
+ break;
+ case POS:
+ fieldVector.put(field, Field.TermVector.WITH_POSITIONS);
+ break;
+ case POS_OFFSET:
+ fieldVector.put(field, Field.TermVector.WITH_POSITIONS_OFFSETS);
+ break;
+ case YES:
+ fieldVector.put(field, Field.TermVector.YES);
+ break;
+ }
+ }
+ }
+ }
+
+ public void close() throws IOException {
+ writer.optimize();
+ writer.close();
+ fs.completeLocalOutput(perm, temp); // copy to dfs
+ fs.createNewFile(new Path(perm, Indexer.DONE_NAME));
+ }
+
+ public void open(Configuration conf)
+ throws IOException {
+ this.fs = FileSystem.get(conf);
+ perm = new Path(conf.get(LuceneConstants.OUTPUT_DIR));
+ temp = new Path(conf.get(LuceneConstants.TEMP_OUTPUT_DIR));
+
+ fs.delete(perm); // delete old, if any
+ analyzerFactory = new AnalyzerFactory(conf);
+ writer = new IndexWriter(fs.startLocalOutput(perm, temp).toString(),
+ new NutchDocumentAnalyzer(conf), true);
+
+ writer.setMergeFactor(conf.getInt("indexer.mergeFactor", 10));
+ writer.setMaxBufferedDocs(conf.getInt("indexer.minMergeDocs", 100));
+ writer.setMaxMergeDocs(conf
+ .getInt("indexer.maxMergeDocs", Integer.MAX_VALUE));
+ writer.setTermIndexInterval(conf.getInt("indexer.termIndexInterval", 128));
+ writer.setMaxFieldLength(conf.getInt("indexer.max.tokens", 10000));
+ writer.setInfoStream(LogUtil.getDebugStream(Indexer.LOG));
+ writer.setUseCompoundFile(false);
+ writer.setSimilarity(new NutchSimilarity());
+
+ processOptions(conf);
+ }
+
+ public void write(NutchDocument doc) throws IOException {
+ Document luceneDoc = createLuceneDoc(doc);
+ NutchAnalyzer analyzer = analyzerFactory.get(luceneDoc.get("lang"));
+ if (Indexer.LOG.isDebugEnabled()) {
+ Indexer.LOG.debug("Indexing [" + luceneDoc.get("url")
+ + "] with analyzer " + analyzer + " (" + luceneDoc.get("lang")
+ + ")");
+ }
+ writer.addDocument(luceneDoc, analyzer);
+
+ }
+
+ /** Adds a lucene field.
+ *
+ * This method is provided for backward-compatibility with
+ * older indexing filters. This should not be used by newer
+ * implementations since this is slower than
+ * {@link NutchDocument#add(String, String)} and will be removed
+ * in a future release.
+ *
+ * @param f Lucene field to be added.
+ * @deprecated Use {@link NutchDocument#add(String, String)} instead and
+ * set index-level metadata for field information.
+ * */
+ public static void add(NutchDocument doc, Field f) {
+ String fieldName = f.name();
+ String key = LuceneConstants.FIELD_PREFIX + fieldName;
+ Metadata documentMeta = doc.getDocumentMeta();
+ if (f.isStored()) {
+ documentMeta.add(key, LuceneConstants.STORE_YES);
+ } else if (f.isCompressed()) {
+ documentMeta.add(key, LuceneConstants.STORE_COMPRESS);
+ } else {
+ documentMeta.add(key, LuceneConstants.STORE_NO);
+ }
+
+ if (f.isIndexed()) {
+ if (f.isTokenized()) {
+ documentMeta.add(key, LuceneConstants.INDEX_TOKENIZED);
+ } else if (f.getOmitNorms()) {
+ documentMeta.add(key, LuceneConstants.INDEX_NO_NORMS);
+ } else {
+ documentMeta.add(key, LuceneConstants.INDEX_UNTOKENIZED);
+ }
+ } else {
+ documentMeta.add(key, LuceneConstants.INDEX_NO);
+ }
+
+ if (f.isStoreOffsetWithTermVector() && f.isStorePositionWithTermVector()) {
+ documentMeta.add(key, LuceneConstants.VECTOR_POS_OFFSET);
+ } else if (f.isStoreOffsetWithTermVector()) {
+ documentMeta.add(key, LuceneConstants.VECTOR_OFFSET);
+ } else if (f.isStorePositionWithTermVector()) {
+ documentMeta.add(key, LuceneConstants.VECTOR_POS);
+ } else if (f.isTermVectorStored()) {
+ documentMeta.add(key, LuceneConstants.VECTOR_YES);
+ } else {
+ documentMeta.add(key, LuceneConstants.VECTOR_NO);
+ }
+ }
+
+ public static void addFieldOptions(String field, LuceneWriter.STORE store,
+ LuceneWriter.INDEX index, LuceneWriter.VECTOR vector, Configuration conf) {
+
+ conf.set(LuceneConstants.FIELD_STORE_PREFIX + field, store.toString());
+ conf.set(LuceneConstants.FIELD_INDEX_PREFIX + field, index.toString());
+ conf.set(LuceneConstants.FIELD_VECTOR_PREFIX + field, vector.toString());
+ }
+
+ public static void addFieldOptions(String field, LuceneWriter.STORE store,
+ LuceneWriter.INDEX index, Configuration conf) {
+ LuceneWriter.addFieldOptions(field, store, index, LuceneWriter.VECTOR.NO, conf);
+ }
+}
Index: src/java/org/apache/nutch/indexer/NutchDocument.java
===================================================================
--- src/java/org/apache/nutch/indexer/NutchDocument.java (revision 0)
+++ src/java/org/apache/nutch/indexer/NutchDocument.java (revision 0)
@@ -0,0 +1,123 @@
+package org.apache.nutch.indexer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VersionMismatchException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.nutch.metadata.Metadata;
+
+/** A {@link NutchDocument} is the unit of indexing.*/
+public class NutchDocument implements Writable {
+
+ public static final byte VERSION = 1;
+
+ private Map> fields;
+
+ private Metadata documentMeta;
+
+ private float score;
+
+ public NutchDocument() {
+ fields = new HashMap>();
+ documentMeta = new Metadata();
+ score = 0.0f;
+ }
+
+ public void add(String name, String value) {
+ List fieldValues = fields.get(name);
+ if (fieldValues == null) {
+ fieldValues = new ArrayList();
+ }
+ fieldValues.add(value);
+ fields.put(name, fieldValues);
+ }
+
+ private void addFieldUnprotected(String name, String value) {
+ fields.get(name).add(value);
+ }
+
+ public String getFieldValue(String name) {
+ List fieldValues = fields.get(name);
+ if (fieldValues == null) {
+ return null;
+ }
+ if (fieldValues.size() == 0) {
+ return null;
+ }
+ return fieldValues.get(0);
+ }
+
+ public List getFieldValues(String name) {
+ return fields.get(name);
+ }
+
+ public List removeField(String name) {
+ return fields.remove(name);
+ }
+
+ public Collection getFieldNames() {
+ return fields.keySet();
+ }
+
+ /** Iterate over all fields. */
+ public Iterator>> fieldIterator() {
+ return fields.entrySet().iterator();
+ }
+
+ public float getScore() {
+ return score;
+ }
+
+ public void setScore(float score) {
+ this.score = score;
+ }
+
+ public Metadata getDocumentMeta() {
+ return documentMeta;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ byte version = in.readByte();
+ if (version != VERSION) {
+ throw new VersionMismatchException(VERSION, version);
+ }
+ int size = WritableUtils.readVInt(in);
+ for (int i = 0; i < size; i++) {
+ String name = Text.readString(in);
+ int numValues = WritableUtils.readVInt(in);
+ fields.put(name, new ArrayList());
+ for (int j = 0; j < numValues; j++) {
+ String value = Text.readString(in);
+ addFieldUnprotected(name, value);
+ }
+ }
+ score = in.readFloat();
+ documentMeta.readFields(in);
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeByte(VERSION);
+ WritableUtils.writeVInt(out, fields.size());
+ for (Map.Entry> entry : fields.entrySet()) {
+ Text.writeString(out, entry.getKey());
+ List values = entry.getValue();
+ WritableUtils.writeVInt(out, values.size());
+ for (String value : values) {
+ Text.writeString(out, value);
+ }
+ }
+ out.writeFloat(score);
+ documentMeta.write(out);
+ }
+
+}
Index: src/java/org/apache/nutch/indexer/NutchIndexWriter.java
===================================================================
--- src/java/org/apache/nutch/indexer/NutchIndexWriter.java (revision 0)
+++ src/java/org/apache/nutch/indexer/NutchIndexWriter.java (revision 0)
@@ -0,0 +1,14 @@
+package org.apache.nutch.indexer;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+public interface NutchIndexWriter {
+ public void open(Configuration conf) throws IOException;
+
+ public void write(NutchDocument doc) throws IOException;
+
+ public void close() throws IOException;
+
+}
Index: src/java/org/apache/nutch/indexer/solr/SolrConstants.java
===================================================================
--- src/java/org/apache/nutch/indexer/solr/SolrConstants.java (revision 0)
+++ src/java/org/apache/nutch/indexer/solr/SolrConstants.java (revision 0)
@@ -0,0 +1,8 @@
+package org.apache.nutch.indexer.solr;
+
+public interface SolrConstants {
+ public static final String SOLR_PREFIX = "solr.";
+
+ public static final String SERVER_URL = SOLR_PREFIX + "server.url";
+
+}
Index: src/java/org/apache/nutch/indexer/solr/SolrWriter.java
===================================================================
--- src/java/org/apache/nutch/indexer/solr/SolrWriter.java (revision 0)
+++ src/java/org/apache/nutch/indexer/solr/SolrWriter.java (revision 0)
@@ -0,0 +1,27 @@
+package org.apache.nutch.indexer.solr;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.indexer.NutchIndexWriter;
+import org.apache.nutch.util.solr.SolrClient;
+
+public class SolrWriter implements NutchIndexWriter {
+
+ private SolrClient client;
+
+ public void close() throws IOException {
+ client.commitAndOptimize(true);
+ }
+
+ public void open(Configuration conf)
+ throws IOException {
+ client = new SolrClient(conf);
+ }
+
+ public void write(NutchDocument doc) throws IOException {
+ client.addDocument(doc);
+ }
+
+}
Index: src/java/org/apache/nutch/scoring/ScoringFilter.java
===================================================================
--- src/java/org/apache/nutch/scoring/ScoringFilter.java (revision 681741)
+++ src/java/org/apache/nutch/scoring/ScoringFilter.java (working copy)
@@ -22,9 +22,9 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.io.Text;
-import org.apache.lucene.document.Document;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.crawl.Inlinks;
+import org.apache.nutch.indexer.NutchDocument;
import org.apache.nutch.parse.Parse;
import org.apache.nutch.parse.ParseData;
import org.apache.nutch.plugin.Pluggable;
@@ -156,6 +156,6 @@
* other scoring strategies by modifying Lucene document directly.
* @throws ScoringFilterException
*/
- public float indexerScore(Text url, Document doc, CrawlDatum dbDatum,
+ public float indexerScore(Text url, NutchDocument doc, CrawlDatum dbDatum,
CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore) throws ScoringFilterException;
}
Index: src/java/org/apache/nutch/scoring/ScoringFilters.java
===================================================================
--- src/java/org/apache/nutch/scoring/ScoringFilters.java (revision 681741)
+++ src/java/org/apache/nutch/scoring/ScoringFilters.java (working copy)
@@ -22,9 +22,9 @@
import java.util.List;
import java.util.Map.Entry;
-import org.apache.lucene.document.Document;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.crawl.Inlinks;
+import org.apache.nutch.indexer.NutchDocument;
import org.apache.nutch.parse.Parse;
import org.apache.nutch.parse.ParseData;
import org.apache.nutch.plugin.Extension;
@@ -138,7 +138,7 @@
return adjust;
}
- public float indexerScore(Text url, Document doc, CrawlDatum dbDatum, CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore) throws ScoringFilterException {
+ public float indexerScore(Text url, NutchDocument doc, CrawlDatum dbDatum, CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore) throws ScoringFilterException {
for (int i = 0; i < this.filters.length; i++) {
initScore = this.filters[i].indexerScore(url, doc, dbDatum, fetchDatum, parse, inlinks, initScore);
}
Index: src/java/org/apache/nutch/searcher/DistributedSearch.java
===================================================================
--- src/java/org/apache/nutch/searcher/DistributedSearch.java (revision 681741)
+++ src/java/org/apache/nutch/searcher/DistributedSearch.java (working copy)
@@ -17,51 +17,22 @@
package org.apache.nutch.searcher;
-import java.io.BufferedReader;
import java.io.IOException;
-import java.io.InputStreamReader;
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.StringTokenizer;
-import java.util.TreeSet;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.VersionedProtocol;
-import org.apache.nutch.crawl.Inlinks;
-import org.apache.nutch.parse.ParseData;
-import org.apache.nutch.parse.ParseText;
import org.apache.nutch.util.NutchConfiguration;
-/** Implements the search API over IPC connnections. */
+/** Search/summary servers. */
public class DistributedSearch {
- public static final Log LOG = LogFactory.getLog(DistributedSearch.class);
private DistributedSearch() {} // no public ctor
- /** The distributed search protocol. */
- public static interface Protocol
- extends Searcher, HitDetailer, HitSummarizer, HitContent, HitInlinks, VersionedProtocol {
-
- /** The name of the segments searched by this node. */
- String[] getSegmentNames();
- }
-
- /** The search server. */
- public static class Server {
-
- private Server() {}
-
- /** Runs a search server. */
+ /** Runs a search/summary server. */
+ public static class Server {
public static void main(String[] args) throws Exception {
- String usage = "DistributedSearch$Server ";
+ String usage = "DistributedSearch$Server ";
if (args.length == 0 || args.length > 2) {
System.err.println(usage);
@@ -73,7 +44,8 @@
Configuration conf = NutchConfiguration.create();
- org.apache.hadoop.ipc.Server server = getServer(conf, directory, port);
+ org.apache.hadoop.ipc.Server server =
+ getServer(conf, directory, port);
server.start();
server.join();
}
@@ -78,7 +50,8 @@
server.join();
}
- static org.apache.hadoop.ipc.Server getServer(Configuration conf, Path directory, int port) throws IOException{
+ static org.apache.hadoop.ipc.Server getServer(Configuration conf,
+ Path directory, int port) throws IOException{
NutchBean bean = new NutchBean(conf, directory);
int numHandlers = conf.getInt("searcher.num.handlers", 10);
return RPC.getServer(bean, "0.0.0.0", port, numHandlers, true, conf);
@@ -86,372 +59,50 @@
}
- /** The search client. */
- public static class Client extends Thread
- implements Searcher, HitDetailer, HitSummarizer, HitContent, HitInlinks,
- Runnable {
-
- private InetSocketAddress[] defaultAddresses;
- private boolean[] liveServer;
- private HashMap segmentToAddress = new HashMap();
-
- private boolean running = true;
- private Configuration conf;
-
- private Path file;
- private long timestamp;
- private FileSystem fs;
-
- /** Construct a client talking to servers listed in the named file.
- * Each line in the file lists a server hostname and port, separated by
- * whitespace.
- */
- public Client(Path file, Configuration conf)
- throws IOException {
- this(readConfig(file, conf), conf);
- this.file = file;
- this.timestamp = fs.getFileStatus(file).getModificationTime();
- }
-
- private static InetSocketAddress[] readConfig(Path path, Configuration conf)
- throws IOException {
- FileSystem fs = FileSystem.get(conf);
- BufferedReader reader =
- new BufferedReader(new InputStreamReader(fs.open(path)));
- try {
- ArrayList addrs = new ArrayList();
- String line;
- while ((line = reader.readLine()) != null) {
- StringTokenizer tokens = new StringTokenizer(line);
- if (tokens.hasMoreTokens()) {
- String host = tokens.nextToken();
- if (tokens.hasMoreTokens()) {
- String port = tokens.nextToken();
- addrs.add(new InetSocketAddress(host, Integer.parseInt(port)));
- if (LOG.isInfoEnabled()) {
- LOG.info("Client adding server " + host + ":" + port);
- }
- }
- }
- }
- return (InetSocketAddress[])
- addrs.toArray(new InetSocketAddress[addrs.size()]);
- } finally {
- reader.close();
- }
- }
-
- /** Construct a client talking to the named servers. */
- public Client(InetSocketAddress[] addresses, Configuration conf) throws IOException {
- this.conf = conf;
- this.defaultAddresses = addresses;
- this.liveServer = new boolean[addresses.length];
- this.fs = FileSystem.get(conf);
- updateSegments();
- setDaemon(true);
- start();
- }
-
- private static final Method GET_SEGMENTS;
- private static final Method SEARCH;
- private static final Method DETAILS;
- private static final Method SUMMARY;
- static {
- try {
- GET_SEGMENTS = Protocol.class.getMethod
- ("getSegmentNames", new Class[] {});
- SEARCH = Protocol.class.getMethod
- ("search", new Class[] { Query.class, Integer.TYPE, String.class,
- String.class, Boolean.TYPE});
- DETAILS = Protocol.class.getMethod
- ("getDetails", new Class[] { Hit.class});
- SUMMARY = Protocol.class.getMethod
- ("getSummary", new Class[] { HitDetails.class, Query.class});
- } catch (NoSuchMethodException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Check to see if search-servers file has been modified
- *
- * @throws IOException
- */
- public boolean isFileModified()
- throws IOException {
-
- if (file != null) {
- long modTime = fs.getFileStatus(file).getModificationTime();
- if (timestamp < modTime) {
- this.timestamp = fs.getFileStatus(file).getModificationTime();
- return true;
- }
- }
-
- return false;
- }
-
- /** Updates segment names.
- *
- * @throws IOException
- */
- public void updateSegments() throws IOException {
-
- int liveServers = 0;
- int liveSegments = 0;
-
- if (isFileModified()) {
- defaultAddresses = readConfig(file, conf);
- }
-
- // Create new array of flags so they can all be updated at once.
- boolean[] updatedLiveServer = new boolean[defaultAddresses.length];
-
- // build segmentToAddress map
- Object[][] params = new Object[defaultAddresses.length][0];
- String[][] results =
- (String[][])RPC.call(GET_SEGMENTS, params, defaultAddresses, this.conf);
-
- for (int i = 0; i < results.length; i++) { // process results of call
- InetSocketAddress addr = defaultAddresses[i];
- String[] segments = results[i];
- if (segments == null) {
- updatedLiveServer[i] = false;
- if (LOG.isWarnEnabled()) {
- LOG.warn("Client: no segments from: " + addr);
- }
- continue;
- }
-
- for (int j = 0; j < segments.length; j++) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Client: segment "+segments[j]+" at "+addr);
- }
- segmentToAddress.put(segments[j], addr);
- }
-
- updatedLiveServer[i] = true;
- liveServers++;
- liveSegments += segments.length;
- }
-
- // Now update live server flags.
- this.liveServer = updatedLiveServer;
-
- if (LOG.isInfoEnabled()) {
- LOG.info("STATS: "+liveServers+" servers, "+liveSegments+" segments.");
- }
- }
-
- /** Return the names of segments searched. */
- public String[] getSegmentNames() {
- return (String[])
- segmentToAddress.keySet().toArray(new String[segmentToAddress.size()]);
- }
-
- public Hits search(final Query query, final int numHits,
- final String dedupField, final String sortField,
- final boolean reverse) throws IOException {
- // Get the list of live servers. It would be nice to build this
- // list in updateSegments(), but that would create concurrency issues.
- // We grab a local reference to the live server flags in case it
- // is updated while we are building our list of liveAddresses.
- boolean[] savedLiveServer = this.liveServer;
- int numLive = 0;
- for (int i = 0; i < savedLiveServer.length; i++) {
- if (savedLiveServer[i])
- numLive++;
- }
- InetSocketAddress[] liveAddresses = new InetSocketAddress[numLive];
- int[] liveIndexNos = new int[numLive];
- int k = 0;
- for (int i = 0; i < savedLiveServer.length; i++) {
- if (savedLiveServer[i]) {
- liveAddresses[k] = defaultAddresses[i];
- liveIndexNos[k] = i;
- k++;
- }
+ public static class IndexServer {
+ /** Runs a lucene search server. */
+ public static void main(String[] args) throws Exception {
+ final String usage = "DistributedSearch$IndexServer ";
+ if (args.length == 0 || args.length > 2) {
+ System.err.println(usage);
+ System.exit(-1);
}
- Object[][] params = new Object[liveAddresses.length][5];
- for (int i = 0; i < params.length; i++) {
- params[i][0] = query;
- params[i][1] = new Integer(numHits);
- params[i][2] = dedupField;
- params[i][3] = sortField;
- params[i][4] = Boolean.valueOf(reverse);
- }
- Hits[] results = (Hits[])RPC.call(SEARCH, params, liveAddresses, this.conf);
-
- TreeSet queue; // cull top hits from results
-
- if (sortField == null || reverse) {
- queue = new TreeSet(new Comparator() {
- public int compare(Object o1, Object o2) {
- return ((Comparable)o2).compareTo(o1); // reverse natural order
- }
- });
- } else {
- queue = new TreeSet();
- }
-
- long totalHits = 0;
- Comparable maxValue = null;
- for (int i = 0; i < results.length; i++) {
- Hits hits = results[i];
- if (hits == null) continue;
- totalHits += hits.getTotal();
- for (int j = 0; j < hits.getLength(); j++) {
- Hit h = hits.getHit(j);
- if (maxValue == null ||
- ((reverse || sortField == null)
- ? h.getSortValue().compareTo(maxValue) >= 0
- : h.getSortValue().compareTo(maxValue) <= 0)) {
- queue.add(new Hit(liveIndexNos[i], h.getIndexDocNo(),
- h.getSortValue(), h.getDedupValue()));
- if (queue.size() > numHits) { // if hit queue overfull
- queue.remove(queue.last()); // remove lowest in hit queue
- maxValue = ((Hit)queue.last()).getSortValue(); // reset maxValue
- }
- }
- }
- }
- return new Hits(totalHits, (Hit[])queue.toArray(new Hit[queue.size()]));
- }
-
- // version for hadoop-0.5.0.jar
- public static final long versionID = 1L;
-
- private Protocol getRemote(Hit hit) throws IOException {
- return (Protocol)
- RPC.getProxy(Protocol.class, versionID, defaultAddresses[hit.getIndexNo()], conf);
- }
+ int port = Integer.parseInt(args[0]);
+ Path dir = new Path(args[1]);
- private Protocol getRemote(HitDetails hit) throws IOException {
- InetSocketAddress address =
- (InetSocketAddress)segmentToAddress.get(hit.getValue("segment"));
- return (Protocol)RPC.getProxy(Protocol.class, versionID, address, conf);
- }
+ Configuration conf = NutchConfiguration.create();
- public String getExplanation(Query query, Hit hit) throws IOException {
- return getRemote(hit).getExplanation(query, hit);
+ LuceneSearchBean bean = new LuceneSearchBean(conf,
+ new Path(dir, "index"), new Path(dir, "indexes"));
+ org.apache.hadoop.ipc.RPC.Server server =
+ RPC.getServer(bean, "0.0.0.0", port, 10, false, conf);
+ server.start();
+ server.join();
}
-
- public HitDetails getDetails(Hit hit) throws IOException {
- return getRemote(hit).getDetails(hit);
- }
-
- public HitDetails[] getDetails(Hit[] hits) throws IOException {
- InetSocketAddress[] addrs = new InetSocketAddress[hits.length];
- Object[][] params = new Object[hits.length][1];
- for (int i = 0; i < hits.length; i++) {
- addrs[i] = defaultAddresses[hits[i].getIndexNo()];
- params[i][0] = hits[i];
- }
- return (HitDetails[])RPC.call(DETAILS, params, addrs, conf);
- }
-
-
- public Summary getSummary(HitDetails hit, Query query) throws IOException {
- return getRemote(hit).getSummary(hit, query);
- }
-
- public Summary[] getSummary(HitDetails[] hits, Query query)
- throws IOException {
- InetSocketAddress[] addrs = new InetSocketAddress[hits.length];
- Object[][] params = new Object[hits.length][2];
- for (int i = 0; i < hits.length; i++) {
- HitDetails hit = hits[i];
- addrs[i] =
- (InetSocketAddress)segmentToAddress.get(hit.getValue("segment"));
- params[i][0] = hit;
- params[i][1] = query;
+ }
+
+ public static class SegmentServer {
+ /** Runs a summary server. */
+ public static void main(String[] args) throws Exception {
+ final String usage =
+ "DistributedSearch$SegmentServer ";
+ if (args.length < 2) {
+ System.err.println(usage);
+ System.exit(1);
}
- return (Summary[])RPC.call(SUMMARY, params, addrs, conf);
- }
-
- public byte[] getContent(HitDetails hit) throws IOException {
- return getRemote(hit).getContent(hit);
- }
-
- public ParseData getParseData(HitDetails hit) throws IOException {
- return getRemote(hit).getParseData(hit);
- }
- public ParseText getParseText(HitDetails hit) throws IOException {
- return getRemote(hit).getParseText(hit);
- }
+ Configuration conf = NutchConfiguration.create();
+ int port = Integer.parseInt(args[0]);
+ Path segmentsDir = new Path(args[1], "segments");
- public String[] getAnchors(HitDetails hit) throws IOException {
- return getRemote(hit).getAnchors(hit);
- }
-
- public Inlinks getInlinks(HitDetails hit) throws IOException {
- return getRemote(hit).getInlinks(hit);
- }
-
- public long getFetchDate(HitDetails hit) throws IOException {
- return getRemote(hit).getFetchDate(hit);
- }
+ FetchedSegments segments = new FetchedSegments(conf, segmentsDir);
- public static void main(String[] args) throws Exception {
- String usage = "DistributedSearch$Client query ...";
-
- if (args.length == 0) {
- System.err.println(usage);
- System.exit(-1);
- }
-
- Query query = Query.parse(args[0], NutchConfiguration.create());
+ org.apache.hadoop.ipc.RPC.Server server =
+ RPC.getServer(segments, "0.0.0.0", port, conf);
- InetSocketAddress[] addresses = new InetSocketAddress[(args.length-1)/2];
- for (int i = 0; i < (args.length-1)/2; i++) {
- addresses[i] =
- new InetSocketAddress(args[i*2+1], Integer.parseInt(args[i*2+2]));
- }
-
- Client client = new Client(addresses, NutchConfiguration.create());
- //client.setTimeout(Integer.MAX_VALUE);
-
- Hits hits = client.search(query, 10, null, null, false);
- System.out.println("Total hits: " + hits.getTotal());
- for (int i = 0; i < hits.getLength(); i++) {
- System.out.println(" "+i+" "+ client.getDetails(hits.getHit(i)));
- }
-
- }
-
- public void run() {
- while (running){
- try{
- Thread.sleep(10000);
- } catch (InterruptedException ie){
- if (LOG.isInfoEnabled()) {
- LOG.info("Thread sleep interrupted.");
- }
- }
- try{
- if (LOG.isInfoEnabled()) {
- LOG.info("Querying segments from search servers...");
- }
- updateSegments();
- } catch (IOException ioe) {
- if (LOG.isWarnEnabled()) { LOG.warn("No search servers available!"); }
- liveServer = new boolean[defaultAddresses.length];
- }
- }
- }
-
- /**
- * Stops the watchdog thread.
- */
- public void close() {
- running = false;
- interrupt();
- }
-
- public boolean[] getLiveServer() {
- return liveServer;
+ server.start();
+ server.join();
}
}
}
\ No newline at end of file
Index: src/java/org/apache/nutch/searcher/DistributedSearchBean.java
===================================================================
--- src/java/org/apache/nutch/searcher/DistributedSearchBean.java (revision 0)
+++ src/java/org/apache/nutch/searcher/DistributedSearchBean.java (revision 0)
@@ -0,0 +1,326 @@
+package org.apache.nutch.searcher;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.util.StringUtils;
+
+public class DistributedSearchBean implements SearchBean {
+
+ private static final ExecutorService executor =
+ Executors.newCachedThreadPool();
+
+ private final ScheduledExecutorService pingService;
+
+ private class SearchTask implements Callable {
+ private int id;
+
+ private Query query;
+ private int numHits;
+ private String dedupField;
+ private String sortField;
+ private boolean reverse;
+
+ public SearchTask(int id) {
+ this.id = id;
+ }
+
+ public Hits call() throws Exception {
+ if (!liveServers[id]) {
+ return null;
+ }
+ return beans[id].search(query, numHits, dedupField, sortField, reverse);
+ }
+
+ public void setSearchArgs(Query query, int numHits, String dedupField,
+ String sortField, boolean reverse) {
+ this.query = query;
+ this.numHits = numHits;
+ this.dedupField = dedupField;
+ this.sortField = sortField;
+ this.reverse = reverse;
+ }
+
+ }
+
+ private class DetailTask implements Callable {
+ private int id;
+
+ private Hit[] hits;
+
+ public DetailTask(int id) {
+ this.id = id;
+ }
+
+ public HitDetails[] call() throws Exception {
+ if (hits == null) {
+ return null;
+ }
+ return beans[id].getDetails(hits);
+ }
+
+ public void setHits(Hit[] hits) {
+ this.hits = hits;
+ }
+
+ }
+
+ private class PingWorker implements Runnable {
+ private int id;
+
+ public PingWorker(int id) {
+ this.id = id;
+ }
+
+ public void run() {
+ try {
+ if (beans[id].ping()) {
+ liveServers[id] = true;
+ } else {
+ liveServers[id] = false;
+ }
+ } catch (IOException e) {
+ liveServers[id] = false;
+ }
+ }
+ }
+
+ private volatile boolean liveServers[];
+
+ private SearchBean[] beans;
+
+ private List> searchTasks;
+
+ private List> detailTasks;
+
+ private List pingWorkers;
+
+ private long timeout;
+
+ public DistributedSearchBean(Configuration conf,
+ Path luceneConfig, Path solrConfig)
+ throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+
+ this.timeout = conf.getLong("ipc.client.timeout", 60000);
+
+ List beanList = new ArrayList();
+
+ if (fs.exists(luceneConfig)) {
+ addLuceneBeans(beanList, luceneConfig, conf);
+ }
+
+ if (fs.exists(solrConfig)) {
+ addSolrBeans(beanList, solrConfig, conf);
+ }
+
+ beans = beanList.toArray(new SearchBean[beanList.size()]);
+
+ liveServers = new boolean[beans.length];
+ for (int i = 0; i < liveServers.length; i++) {
+ liveServers[i] = true;
+ }
+
+ searchTasks = new ArrayList>();
+ detailTasks = new ArrayList>();
+ pingWorkers = new ArrayList();
+
+ for (int i = 0; i < beans.length; i++) {
+ searchTasks.add(new SearchTask(i));
+ detailTasks.add(new DetailTask(i));
+ pingWorkers.add(new PingWorker(i));
+ }
+
+ pingService = Executors.newScheduledThreadPool(beans.length);
+ for (PingWorker worker : pingWorkers) {
+ pingService.scheduleAtFixedRate(worker, 0, 10, TimeUnit.SECONDS);
+ }
+
+ }
+
+ private static void addLuceneBeans(List beanList,
+ Path luceneConfig, Configuration conf)
+ throws IOException {
+ Configuration newConf = new Configuration(conf);
+
+ // do not retry connections
+ newConf.setInt("ipc.client.connect.max.retries", 0);
+
+ List luceneServers =
+ NutchBean.readAddresses(luceneConfig, conf);
+ for (InetSocketAddress addr : luceneServers) {
+ beanList.add((RPCSearchBean) RPC.getProxy(RPCSearchBean.class,
+ LuceneSearchBean.VERSION, addr, newConf));
+ }
+ }
+
+ private static void addSolrBeans(List beanList,
+ Path solrConfig, Configuration conf)
+ throws IOException {
+ for (String solrServer : NutchBean.readConfig(solrConfig, conf)) {
+ beanList.add(new SolrSearchBean(conf, solrServer));
+ }
+ }
+
+ public String getExplanation(Query query, Hit hit) throws IOException {
+ return beans[hit.getIndexNo()].getExplanation(query, hit);
+ }
+
+ public Hits search(Query query, int numHits, String dedupField,
+ String sortField, boolean reverse) throws IOException {
+
+ for (Callable task : searchTasks) {
+ ((SearchTask)task).setSearchArgs(query, numHits, dedupField, sortField,
+ reverse);
+ }
+
+ List> allHits;
+ try {
+ allHits =
+ executor.invokeAll(searchTasks, timeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ PriorityQueue queue; // cull top hits from results
+ if (sortField == null || reverse) {
+ queue = new PriorityQueue(numHits);
+ } else {
+ queue = new PriorityQueue(numHits, new Comparator() {
+ public int compare(Hit h1, Hit h2) {
+ return h2.compareTo(h1); // reverse natural order
+ }
+ });
+ }
+
+ long totalHits = 0;
+ int allHitsSize = allHits.size();
+ for (int i = 0; i < allHitsSize; i++) {
+ Hits hits = null;
+ try {
+ hits = allHits.get(i).get();
+ } catch (InterruptedException e) {
+ // ignore
+ } catch (ExecutionException e) {
+ LOG.warn("Retrieving hits failed with exception: " +
+ StringUtils.stringifyException(e.getCause()));
+ }
+
+ if (hits == null) {
+ continue;
+ }
+
+ totalHits += hits.getTotal();
+
+ int hitsLength = hits.getLength();
+ for (int j = 0; j < hitsLength; j++) {
+ Hit hit = hits.getHit(j);
+ Hit newHit = new Hit(i, hit.getUniqueKey(),
+ hit.getSortValue(), hit.getDedupValue());
+ queue.add(newHit);
+ if (queue.size() > numHits) { // if hit queue overfull
+ queue.remove();
+ }
+ }
+ }
+
+ // we have to sort results since PriorityQueue.toArray
+ // may not return results in sorted order
+ Hit[] culledResults = queue.toArray(new Hit[queue.size()]);
+ Arrays.sort(culledResults, Collections.reverseOrder(queue.comparator()));
+
+ return new Hits(totalHits, culledResults);
+ }
+
+ public void close() throws IOException {
+ executor.shutdown();
+ pingService.shutdown();
+ for (SearchBean bean : beans) {
+ bean.close();
+ }
+ }
+
+ public HitDetails getDetails(Hit hit) throws IOException {
+ return beans[hit.getIndexNo()].getDetails(hit);
+ }
+
+ @SuppressWarnings("unchecked")
+ public HitDetails[] getDetails(Hit[] hits) throws IOException {
+ List[] hitList = new ArrayList[detailTasks.size()];
+
+ for (int i = 0; i < hitList.length; i++) {
+ hitList[i] = new ArrayList();
+ }
+
+ for (int i = 0; i < hits.length; i++) {
+ Hit hit = hits[i];
+ hitList[hit.getIndexNo()].add(hit);
+ }
+
+ for (int i = 0; i < detailTasks.size(); i++) {
+ DetailTask task = (DetailTask)detailTasks.get(i);
+ if (hitList[i].size() > 0) {
+ task.setHits(hitList[i].toArray(new Hit[hitList[i].size()]));
+ } else {
+ task.setHits(null);
+ }
+ }
+
+ List> allDetails;
+ try {
+ allDetails =
+ executor.invokeAll(detailTasks, timeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ /* getDetails(Hit[]) method assumes that HitDetails[i] returned corresponds
+ * to Hit[i] given as parameter. To keep this order, we have to 'merge'
+ * HitDetails[] returned from individual detailTasks.
+ */
+ HitDetails[][] detailsMatrix = new HitDetails[detailTasks.size()][];
+ for (int i = 0; i < detailsMatrix.length; i++) {
+ try {
+ detailsMatrix[i] = allDetails.get(i).get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new RuntimeException(e);
+ }
+ }
+
+ int[] hitPos = new int[detailTasks.size()]; // keep track of where we are
+ HitDetails[] detailsArr = new HitDetails[hits.length];
+ for (int i = 0; i < detailsArr.length; i++) {
+ int indexNo = hits[i].getIndexNo();
+ detailsArr[i] = detailsMatrix[indexNo][(hitPos[indexNo]++)];
+ }
+
+ return detailsArr;
+ }
+
+ public boolean ping() {
+ return true; // not used
+ }
+
+}
Index: src/java/org/apache/nutch/searcher/DistributedSegmentBean.java
===================================================================
--- src/java/org/apache/nutch/searcher/DistributedSegmentBean.java (revision 0)
+++ src/java/org/apache/nutch/searcher/DistributedSegmentBean.java (revision 0)
@@ -0,0 +1,214 @@
+package org.apache.nutch.searcher;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.parse.ParseText;
+
+public class DistributedSegmentBean implements SegmentBean {
+
+ private static final ExecutorService executor =
+ Executors.newCachedThreadPool();
+
+ private final ScheduledExecutorService pingService;
+
+ private class DistSummmaryTask implements Callable {
+ private int id;
+
+ private HitDetails[] details;
+ private Query query;
+
+ public DistSummmaryTask(int id) {
+ this.id = id;
+ }
+
+ public Summary[] call() throws Exception {
+ if (details == null) {
+ return null;
+ }
+ return beans[id].getSummary(details, query);
+ }
+
+ public void setSummaryArgs(HitDetails[] details, Query query) {
+ this.details = details;
+ this.query = query;
+ }
+
+ }
+
+ private class SegmentWorker implements Runnable {
+ private int id;
+
+ public SegmentWorker(int id) {
+ this.id = id;
+ }
+
+ public void run() {
+ try {
+ String[] segments = beans[id].getSegmentNames();
+ for (String segment : segments) {
+ segmentMap.put(segment, id);
+ }
+ } catch (IOException e) {
+ // remove all segments this bean was serving
+ Iterator> i =
+ segmentMap.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry entry = i.next();
+ int curId = entry.getValue();
+ if (curId == this.id) {
+ i.remove();
+ }
+ }
+ }
+ }
+ }
+
+ private long timeout;
+
+ private SegmentBean[] beans;
+
+ private ConcurrentMap segmentMap;
+
+ private List> summaryTasks;
+
+ private List segmentWorkers;
+
+ public DistributedSegmentBean(Configuration conf, Path serversConfig)
+ throws IOException {
+ this.timeout = conf.getLong("ipc.client.timeout", 60000);
+
+ List beanList = new ArrayList();
+
+ List segmentServers =
+ NutchBean.readAddresses(serversConfig, conf);
+
+ for (InetSocketAddress addr : segmentServers) {
+ SegmentBean bean = (RPCSegmentBean) RPC.getProxy(RPCSegmentBean.class,
+ FetchedSegments.VERSION, addr, conf);
+ beanList.add(bean);
+ }
+
+ beans = beanList.toArray(new SegmentBean[beanList.size()]);
+
+ summaryTasks = new ArrayList>(beans.length);
+ segmentWorkers = new ArrayList(beans.length);
+
+ for (int i = 0; i < beans.length; i++) {
+ summaryTasks.add(new DistSummmaryTask(i));
+ segmentWorkers.add(new SegmentWorker(i));
+ }
+
+ segmentMap = new ConcurrentHashMap();
+
+ pingService = Executors.newScheduledThreadPool(beans.length);
+ for (SegmentWorker worker : segmentWorkers) {
+ pingService.scheduleAtFixedRate(worker, 0, 30, TimeUnit.SECONDS);
+ }
+ }
+
+ private SegmentBean getBean(HitDetails details) {
+ return beans[segmentMap.get(details.getValue("segment"))];
+ }
+
+ public String[] getSegmentNames() {
+ return segmentMap.keySet().toArray(new String[segmentMap.size()]);
+ }
+
+ public byte[] getContent(HitDetails details) throws IOException {
+ return getBean(details).getContent(details);
+ }
+
+ public long getFetchDate(HitDetails details) throws IOException {
+ return getBean(details).getFetchDate(details);
+ }
+
+ public ParseData getParseData(HitDetails details) throws IOException {
+ return getBean(details).getParseData(details);
+ }
+
+ public ParseText getParseText(HitDetails details) throws IOException {
+ return getBean(details).getParseText(details);
+ }
+
+ public void close() throws IOException {
+ executor.shutdown();
+ pingService.shutdown();
+ for (SegmentBean bean : beans) {
+ bean.close();
+ }
+ }
+
+ public Summary getSummary(HitDetails details, Query query)
+ throws IOException {
+ return getBean(details).getSummary(details, query);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Summary[] getSummary(HitDetails[] detailsArr, Query query)
+ throws IOException {
+ List[] detailsList = new ArrayList[summaryTasks.size()];
+ for (int i = 0; i < detailsList.length; i++) {
+ detailsList[i] = new ArrayList();
+ }
+ for (HitDetails details : detailsArr) {
+ detailsList[segmentMap.get(details.getValue("segment"))].add(details);
+ }
+ for (int i = 0; i < summaryTasks.size(); i++) {
+ DistSummmaryTask task = (DistSummmaryTask)summaryTasks.get(i);
+ if (detailsList[i].size() > 0) {
+ HitDetails[] taskDetails =
+ detailsList[i].toArray(new HitDetails[detailsList[i].size()]);
+ task.setSummaryArgs(taskDetails, query);
+ } else {
+ task.setSummaryArgs(null, null);
+ }
+ }
+
+ List> summaries;
+ try {
+ summaries =
+ executor.invokeAll(summaryTasks, timeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ List summaryList = new ArrayList();
+ for (Future f : summaries) {
+ Summary[] summaryArray;
+ try {
+ summaryArray = f.get();
+ if (summaryArray == null) {
+ continue;
+ }
+ for (Summary summary : summaryArray) {
+ summaryList.add(summary);
+ }
+ } catch (Exception e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new RuntimeException(e);
+ }
+ }
+
+ return summaryList.toArray(new Summary[summaryList.size()]);
+ }
+
+}
Index: src/java/org/apache/nutch/searcher/FetchedSegments.java
===================================================================
--- src/java/org/apache/nutch/searcher/FetchedSegments.java (revision 681741)
+++ src/java/org/apache/nutch/searcher/FetchedSegments.java (working copy)
@@ -19,8 +19,16 @@
import java.io.IOException;
-import java.util.HashMap;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.*;
@@ -35,7 +43,75 @@
/** Implements {@link HitSummarizer} and {@link HitContent} for a set of
* fetched segments. */
-public class FetchedSegments implements HitSummarizer, HitContent {
+public class FetchedSegments implements RPCSegmentBean {
+
+ public static final long VERSION = 1L;
+
+ private static final ExecutorService executor =
+ Executors.newCachedThreadPool();
+
+ private class SummaryTask implements Callable {
+ private HitDetails details;
+ private Query query;
+
+ public SummaryTask(HitDetails details, Query query) {
+ this.details = details;
+ this.query = query;
+ }
+
+ public Summary call() throws Exception {
+ return getSummary(details, query);
+ }
+ }
+
+ private class SegmentUpdater extends Thread {
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ FileStatus[] fstats = fs.listStatus(segmentsDir,
+ HadoopFSUtil.getPassDirectoriesFilter(fs));
+ Path[] segmentDirs = HadoopFSUtil.getPaths(fstats);
+ Iterator> i =
+ segments.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry entry = i.next();
+ Segment seg = entry.getValue();
+ if (!fs.exists(seg.segmentDir)) {
+ try {
+ seg.close();
+ } catch (Exception e) {
+ /* A segment may fail to close
+ * since it may already be deleted from
+ * file system. So we just ignore the
+ * exception and remove the mapping from
+ * 'segments'.
+ */
+ } finally {
+ i.remove();
+ }
+ }
+ }
+
+ if (segmentDirs != null) {
+ for (Path segmentDir : segmentDirs) {
+ segments.putIfAbsent(segmentDir.getName(),
+ new Segment(fs, segmentDir, conf));
+ }
+ }
+
+ Thread.sleep(60000);
+ } catch (InterruptedException e) {
+ // ignore
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ }
+
+ }
+
private static class Segment implements Closeable {
@@ -87,7 +163,7 @@
}
return (ParseText)getEntry(parseText, url, new ParseText());
}
-
+
private MapFile.Reader[] getReaders(String subDir) throws IOException {
return MapFileOutputFormat.getReaders(fs, new Path(segmentDir, subDir), this.conf);
}
@@ -112,27 +188,33 @@
}
- private HashMap segments = new HashMap();
+ private ConcurrentMap segments =
+ new ConcurrentHashMap();
+ private FileSystem fs;
+ private Configuration conf;
+ private Path segmentsDir;
+ private SegmentUpdater segUpdater;
private Summarizer summarizer;
-
+
/** Construct given a directory containing fetcher output. */
- public FetchedSegments(FileSystem fs, String segmentsDir, Configuration conf) throws IOException {
- FileStatus[] fstats = fs.listStatus(new Path(segmentsDir),
+ public FetchedSegments(Configuration conf, Path segmentsDir)
+ throws IOException {
+ this.conf = conf;
+ this.fs = FileSystem.get(this.conf);
+ FileStatus[] fstats = fs.listStatus(segmentsDir,
HadoopFSUtil.getPassDirectoriesFilter(fs));
Path[] segmentDirs = HadoopFSUtil.getPaths(fstats);
- this.summarizer = new SummarizerFactory(conf).getSummarizer();
+ this.summarizer = new SummarizerFactory(this.conf).getSummarizer();
+ this.segmentsDir = segmentsDir;
+ this.segUpdater = new SegmentUpdater();
if (segmentDirs != null) {
- for (int i = 0; i < segmentDirs.length; i++) {
- Path segmentDir = segmentDirs[i];
-// Path indexdone = new Path(segmentDir, IndexSegment.DONE_NAME);
-// if (fs.exists(indexdone) && fs.isFile(indexdone)) {
-// segments.put(segmentDir.getName(), new Segment(fs, segmentDir));
-// }
- segments.put(segmentDir.getName(), new Segment(fs, segmentDir, conf));
-
- }
+ for (Path segmentDir : segmentDirs) {
+ segments.put(segmentDir.getName(),
+ new Segment(this.fs, segmentDir, this.conf));
+ }
}
+ this.segUpdater.start();
}
public String[] getSegmentNames() {
@@ -167,51 +249,41 @@
return this.summarizer.getSummary(text, query);
}
-
- private class SummaryThread extends Thread {
- private HitDetails details;
- private Query query;
-
- private Summary summary;
- private Throwable throwable;
-
- public SummaryThread(HitDetails details, Query query) {
- this.details = details;
- this.query = query;
- }
-
- public void run() {
- try {
- this.summary = getSummary(details, query);
- } catch (Throwable throwable) {
- this.throwable = throwable;
- }
- }
-
+
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return VERSION;
}
-
public Summary[] getSummary(HitDetails[] details, Query query)
throws IOException {
- SummaryThread[] threads = new SummaryThread[details.length];
- for (int i = 0; i < threads.length; i++) {
- threads[i] = new SummaryThread(details[i], query);
- threads[i].start();
+ List> tasks =
+ new ArrayList>(details.length);
+ for (int i = 0; i < details.length; i++) {
+ tasks.add(new SummaryTask(details[i], query));
+ }
+
+ List> summaries;
+ try {
+ summaries = executor.invokeAll(tasks);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
}
+
Summary[] results = new Summary[details.length];
- for (int i = 0; i < threads.length; i++) {
+ for (int i = 0; i < details.length; i++) {
+ Future f = summaries.get(i);
+ Summary summary;
try {
- threads[i].join();
- } catch (InterruptedException e) {
+ summary = f.get();
+ } catch (Exception e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
throw new RuntimeException(e);
}
- if (threads[i].throwable instanceof IOException) {
- throw (IOException)threads[i].throwable;
- } else if (threads[i].throwable != null) {
- throw new RuntimeException(threads[i].throwable);
- }
- results[i] = threads[i].summary;
+ results[i] = summary;
}
return results;
}
@@ -218,7 +290,7 @@
private Segment getSegment(HitDetails details) {
- return (Segment)segments.get(details.getValue("segment"));
+ return segments.get(details.getValue("segment"));
}
private Text getUrl(HitDetails details) {
@@ -230,9 +302,9 @@
}
public void close() throws IOException {
- Iterator iterator = segments.values().iterator();
+ Iterator iterator = segments.values().iterator();
while (iterator.hasNext()) {
- ((Segment) iterator.next()).close();
+ iterator.next().close();
}
}
Index: src/java/org/apache/nutch/searcher/Hit.java
===================================================================
--- src/java/org/apache/nutch/searcher/Hit.java (revision 681741)
+++ src/java/org/apache/nutch/searcher/Hit.java (working copy)
@@ -21,6 +21,7 @@
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -25,10 +26,11 @@
import org.apache.hadoop.io.WritableComparable;
/** A document which matched a query in an index. */
-public class Hit implements Writable, Comparable {
+@SuppressWarnings("unchecked")
+public class Hit implements Writable, Comparable {
private int indexNo; // index id
- private int indexDocNo; // index-relative id
+ private String uniqueKey;
private WritableComparable sortValue; // value sorted on
private String dedupValue; // value to dedup on
private boolean moreFromDupExcluded;
@@ -35,17 +37,17 @@
public Hit() {}
- public Hit(int indexNo, int indexDocNo) {
- this(indexNo, indexDocNo, null, null);
+ public Hit(int indexNo, String uniqueKey) {
+ this(indexNo, uniqueKey, null, null);
}
- public Hit(int indexNo, int indexDocNo,
+ public Hit(int indexNo, String uniqueKey,
WritableComparable sortValue,
String dedupValue) {
- this(indexDocNo, sortValue, dedupValue);
+ this(uniqueKey, sortValue, dedupValue);
this.indexNo = indexNo;
}
- public Hit(int indexDocNo, WritableComparable sortValue, String dedupValue) {
- this.indexDocNo = indexDocNo;
+ public Hit(String uniqueKey, WritableComparable sortValue, String dedupValue) {
+ this.uniqueKey = uniqueKey;
this.sortValue = sortValue;
this.dedupValue = dedupValue == null ? "" : dedupValue;
}
@@ -55,7 +57,7 @@
public void setIndexNo(int indexNo) { this.indexNo = indexNo; }
/** Return the document number of this hit within an index. */
- public int getIndexDocNo() { return indexDocNo; }
+ public String getUniqueKey() { return uniqueKey; }
/** Return the value of the field that hits are sorted on. */
public WritableComparable getSortValue() { return sortValue; }
@@ -73,23 +75,10 @@
/** Display as a string. */
public String toString() {
- return "#" + indexDocNo;
- }
-
- public boolean equals(Object o) {
- if (!(o instanceof Hit))
- return false;
- Hit other = (Hit)o;
- return this.indexNo == other.indexNo
- && this.indexDocNo == other.indexDocNo;
- }
-
- public int hashCode() {
- return indexNo ^ indexDocNo;
+ return "#" + uniqueKey;
}
- public int compareTo(Object o) {
- Hit other = (Hit)o;
+ public int compareTo(Hit other) {
int compare = sortValue.compareTo(other.sortValue);
if (compare != 0) {
return compare; // use sortValue
@@ -96,7 +85,7 @@
} else if (other.indexNo != this.indexNo) {
return other.indexNo - this.indexNo; // prefer later indexes
} else {
- return other.indexDocNo - this.indexDocNo; // prefer later docs
+ return other.uniqueKey.compareTo(this.uniqueKey); // prefer later doc
}
}
@@ -101,11 +90,11 @@
}
public void write(DataOutput out) throws IOException {
- out.writeInt(indexDocNo);
+ Text.writeString(out, uniqueKey);
}
public void readFields(DataInput in) throws IOException {
- indexDocNo = in.readInt();
+ uniqueKey = Text.readString(in);
}
}
Index: src/java/org/apache/nutch/searcher/HitDetails.java
===================================================================
--- src/java/org/apache/nutch/searcher/HitDetails.java (revision 681741)
+++ src/java/org/apache/nutch/searcher/HitDetails.java (working copy)
@@ -73,7 +73,7 @@
/** Returns all the values with the specified name. */
public String[] getValues(String field) {
- ArrayList vals = new ArrayList();
+ ArrayList vals = new ArrayList();
for (int i=0; i 0)
- ? (String[]) vals.toArray(new String[vals.size()])
+ ? vals.toArray(new String[vals.size()])
: null;
}
Index: src/java/org/apache/nutch/searcher/Hits.java
===================================================================
--- src/java/org/apache/nutch/searcher/Hits.java (revision 681741)
+++ src/java/org/apache/nutch/searcher/Hits.java (working copy)
@@ -65,7 +65,6 @@
return results;
}
-
public void write(DataOutput out) throws IOException {
out.writeLong(total); // write total hits
out.writeInt(top.length); // write hits returned
@@ -74,7 +73,7 @@
for (int i = 0; i < top.length; i++) {
Hit h = top[i];
- out.writeInt(h.getIndexDocNo()); // write indexDocNo
+ Text.writeString(out, h.getUniqueKey()); // write uniqueKey
h.getSortValue().write(out); // write sortValue
Text.writeString(out, h.getDedupValue()); // write dedupValue
}
@@ -80,6 +79,7 @@
}
}
+ @SuppressWarnings("unchecked")
public void readFields(DataInput in) throws IOException {
total = in.readLong(); // read total hits
top = new Hit[in.readInt()]; // read hits returned
@@ -93,7 +93,7 @@
}
for (int i = 0; i < top.length; i++) {
- int indexDocNo = in.readInt(); // read indexDocNo
+ String uniqueKey = Text.readString(in); // read uniqueKey
WritableComparable sortValue = null;
try {
@@ -105,7 +105,7 @@
String dedupValue = Text.readString(in); // read dedupValue
- top[i] = new Hit(indexDocNo, sortValue, dedupValue);
+ top[i] = new Hit(uniqueKey, sortValue, dedupValue);
}
}
Index: src/java/org/apache/nutch/searcher/IndexSearcher.java
===================================================================
--- src/java/org/apache/nutch/searcher/IndexSearcher.java (revision 681741)
+++ src/java/org/apache/nutch/searcher/IndexSearcher.java (working copy)
@@ -103,7 +103,7 @@
public String getExplanation(Query query, Hit hit) throws IOException {
return luceneSearcher.explain(this.queryFilters.filter(query),
- hit.getIndexDocNo()).toHtml();
+ Integer.valueOf(hit.getUniqueKey())).toHtml();
}
public HitDetails getDetails(Hit hit) throws IOException {
@@ -108,7 +108,7 @@
public HitDetails getDetails(Hit hit) throws IOException {
- Document doc = luceneSearcher.doc(hit.getIndexDocNo());
+ Document doc = luceneSearcher.doc(Integer.valueOf(hit.getUniqueKey()));
List docFields = doc.getFields();
String[] fields = new String[docFields.size()];
@@ -162,7 +162,7 @@
String dedupValue = dedupValues == null ? null : dedupValues[doc];
- hits[i] = new Hit(doc, sortValue, dedupValue);
+ hits[i] = new Hit(Integer.toString(doc), sortValue, dedupValue);
}
return new Hits(topDocs.totalHits, hits);
}
Index: src/java/org/apache/nutch/searcher/LuceneQueryOptimizer.java
===================================================================
--- src/java/org/apache/nutch/searcher/LuceneQueryOptimizer.java (revision 681741)
+++ src/java/org/apache/nutch/searcher/LuceneQueryOptimizer.java (working copy)
@@ -18,7 +18,6 @@
package org.apache.nutch.searcher;
import org.apache.lucene.search.Searcher;
-import org.apache.lucene.search.QueryFilter;
import org.apache.lucene.search.*;
import org.apache.lucene.index.Term;
import org.apache.lucene.misc.ChainedFilter;
@@ -83,6 +82,7 @@
}
+ @SuppressWarnings("serial")
private static class TimeExceeded extends RuntimeException {
public long maxTime;
private int maxDoc;
@@ -127,7 +127,8 @@
}
}
- private static class LimitExceeded extends RuntimeException {
+ @SuppressWarnings("serial")
+private static class LimitExceeded extends RuntimeException {
private int maxDoc;
public LimitExceeded(int maxDoc) { this.maxDoc = maxDoc; }
}
@@ -151,7 +152,8 @@
* @param threshold
* the fraction of documents which must contain a term
*/
- public LuceneQueryOptimizer(Configuration conf) {
+ @SuppressWarnings("serial")
+public LuceneQueryOptimizer(Configuration conf) {
final int cacheSize = conf.getInt("searcher.filter.cache.size", 16);
this.threshold = conf.getFloat("searcher.filter.cache.threshold",
0.05f);
@@ -157,7 +159,7 @@
0.05f);
this.searcherMaxHits = conf.getInt("searcher.max.hits", -1);
this.cache = new LinkedHashMap(cacheSize, 0.75f, true) {
- protected boolean removeEldestEntry(Map.Entry eldest) {
+ protected boolean removeEldestEntry(Map.Entry eldest) {
return size() > cacheSize; // limit size of cache
}
};
Index: src/java/org/apache/nutch/searcher/LuceneSearchBean.java
===================================================================
--- src/java/org/apache/nutch/searcher/LuceneSearchBean.java (revision 0)
+++ src/java/org/apache/nutch/searcher/LuceneSearchBean.java (revision 0)
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.searcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.nutch.indexer.Indexer;
+import org.apache.nutch.util.HadoopFSUtil;
+
+public class LuceneSearchBean implements RPCSearchBean {
+
+ public static final long VERSION = 1L;
+
+ private IndexSearcher searcher;
+
+ private FileSystem fs;
+
+ private Configuration conf;
+
+ /**
+ * Construct in a named directory.
+ * @param conf
+ * @param dir
+ * @throws IOException
+ */
+ public LuceneSearchBean(Configuration conf, Path indexDir, Path indexesDir)
+ throws IOException {
+ this.conf = conf;
+ this.fs = FileSystem.get(this.conf);
+ init(indexDir, indexesDir);
+ }
+
+ private void init(Path indexDir, Path indexesDir)
+ throws IOException {
+ if (this.fs.exists(indexDir)) {
+ LOG.info("opening merged index in " + indexDir);
+ this.searcher = new IndexSearcher(indexDir, this.conf);
+ } else {
+ LOG.info("opening indexes in " + indexesDir);
+
+ List vDirs = new ArrayList();
+ FileStatus[] fstats = fs.listStatus(indexesDir, HadoopFSUtil.getPassDirectoriesFilter(fs));
+ Path[] directories = HadoopFSUtil.getPaths(fstats);
+ for(int i = 0; i < directories.length; i++) {
+ Path indexdone = new Path(directories[i], Indexer.DONE_NAME);
+ if(fs.isFile(indexdone)) {
+ vDirs.add(directories[i]);
+ }
+ }
+
+ directories = new Path[ vDirs.size() ];
+ for(int i = 0; vDirs.size()>0; i++) {
+ directories[i] = vDirs.remove(0);
+ }
+
+ this.searcher = new IndexSearcher(directories, this.conf);
+ }
+ }
+
+ public Hits search(Query query, int numHits, String dedupField,
+ String sortField, boolean reverse)
+ throws IOException {
+ return searcher.search(query, numHits, dedupField, sortField, reverse);
+ }
+
+ public String getExplanation(Query query, Hit hit) throws IOException {
+ return searcher.getExplanation(query, hit);
+ }
+
+ public HitDetails getDetails(Hit hit) throws IOException {
+ return searcher.getDetails(hit);
+ }
+
+ public HitDetails[] getDetails(Hit[] hits) throws IOException {
+ return searcher.getDetails(hits);
+ }
+
+ public boolean ping() throws IOException {
+ return true;
+ }
+
+ public void close() throws IOException {
+ if (searcher != null) { searcher.close(); }
+ if (fs != null) { fs.close(); }
+ }
+
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return VERSION;
+ }
+
+}
Index: src/java/org/apache/nutch/searcher/NutchBean.java
===================================================================
--- src/java/org/apache/nutch/searcher/NutchBean.java (revision 681741)
+++ src/java/org/apache/nutch/searcher/NutchBean.java (working copy)
@@ -18,7 +18,9 @@
package org.apache.nutch.searcher;
import java.io.*;
+import java.net.InetSocketAddress;
import java.util.*;
+
import javax.servlet.*;
import org.apache.commons.logging.Log;
@@ -25,6 +27,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Closeable;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.StringUtils;
@@ -29,9 +32,7 @@
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.StringUtils;
import org.apache.nutch.parse.*;
-import org.apache.nutch.indexer.*;
import org.apache.nutch.crawl.Inlinks;
-import org.apache.nutch.util.HadoopFSUtil;
import org.apache.nutch.util.NutchConfiguration;
/**
@@ -39,8 +40,7 @@
* @version $Id: NutchBean.java,v 1.19 2005/02/07 19:10:08 cutting Exp $
*/
public class NutchBean
- implements Searcher, HitDetailer, HitSummarizer, HitContent, HitInlinks,
- DistributedSearch.Protocol, Closeable {
+implements SearchBean, SegmentBean, HitInlinks, Closeable {
public static final Log LOG = LogFactory.getLog(NutchBean.class);
public static final String KEY = "nutchBean";
@@ -51,10 +51,8 @@
private String[] segmentNames;
- private Searcher searcher;
- private HitDetailer detailer;
- private HitSummarizer summarizer;
- private HitContent content;
+ private SearchBean searchBean;
+ private SegmentBean segmentBean;
private HitInlinks linkDb;
@@ -84,7 +82,8 @@
}
/**
- * Construct in a named directory.
+ * Construct in a named directory.
+ *
* @param conf
* @param dir
* @throws IOException
@@ -90,83 +89,67 @@
* @throws IOException
*/
public NutchBean(Configuration conf, Path dir) throws IOException {
- this.conf = conf;
- this.fs = FileSystem.get(this.conf);
- if (dir == null) {
- dir = new Path(this.conf.get("searcher.dir", "crawl"));
- }
- Path servers = new Path(dir, "search-servers.txt");
- if (fs.exists(servers)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("searching servers in " + servers);
- }
- init(new DistributedSearch.Client(servers, conf));
- } else {
- init(new Path(dir, "index"), new Path(dir, "indexes"), new Path(
- dir, "segments"), new Path(dir, "linkdb"));
- }
+ this.conf = conf;
+ this.fs = FileSystem.get(this.conf);
+ if (dir == null) {
+ dir = new Path(this.conf.get("searcher.dir", "crawl"));
+ }
+ Path luceneConfig = new Path(dir, "search-servers.txt");
+ Path solrConfig = new Path(dir, "solr-servers.txt");
+ Path segmentConfig = new Path(dir, "segment-servers.txt");
+
+ if (fs.exists(luceneConfig) || fs.exists(solrConfig)) {
+ searchBean = new DistributedSearchBean(conf, luceneConfig, solrConfig);
+ } else {
+ Path indexDir = new Path(dir, "index");
+ Path indexesDir = new Path(dir, "indexes");
+ searchBean = new LuceneSearchBean(conf, indexDir, indexesDir);
}
- private void init(Path indexDir, Path indexesDir, Path segmentsDir,
- Path linkDb)
- throws IOException {
- IndexSearcher indexSearcher;
- if (this.fs.exists(indexDir)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("opening merged index in " + indexDir);
- }
- indexSearcher = new IndexSearcher(indexDir, this.conf);
+ if (fs.exists(segmentConfig)) {
+ segmentBean = new DistributedSegmentBean(conf, segmentConfig);
+ } else if (fs.exists(luceneConfig)) {
+ segmentBean = new DistributedSegmentBean(conf, luceneConfig);
} else {
- if (LOG.isInfoEnabled()) {
- LOG.info("opening indexes in " + indexesDir);
- }
-
- Vector vDirs=new Vector();
- FileStatus[] fstats = fs.listStatus(indexesDir,
- HadoopFSUtil.getPassDirectoriesFilter(fs));
- Path [] directories = HadoopFSUtil.getPaths(fstats);
- for(int i = 0; i < directories.length; i++) {
- Path indexdone = new Path(directories[i], Indexer.DONE_NAME);
- if(fs.isFile(indexdone)) {
- vDirs.add(directories[i]);
+ segmentBean = new FetchedSegments(conf, new Path(dir, "segments"));
+ }
+
+ linkDb = new LinkDbInlinks(fs, new Path(dir, "linkdb"), conf);
+ }
+
+ public static List readAddresses(Path path,
+ Configuration conf) throws IOException {
+ List addrs = new ArrayList();
+ for (String line : readConfig(path, conf)) {
+ StringTokenizer tokens = new StringTokenizer(line);
+ if (tokens.hasMoreTokens()) {
+ String host = tokens.nextToken();
+ if (tokens.hasMoreTokens()) {
+ String port = tokens.nextToken();
+ addrs.add(new InetSocketAddress(host, Integer.parseInt(port)));
}
}
-
-
- directories = new Path[ vDirs.size() ];
- for(int i = 0; vDirs.size()>0; i++) {
- directories[i]=(Path)vDirs.remove(0);
- }
-
- indexSearcher = new IndexSearcher(directories, this.conf);
}
+ return addrs;
+ }
- if (LOG.isInfoEnabled()) {
- LOG.info("opening segments in " + segmentsDir);
+ public static List readConfig(Path path, Configuration conf)
+ throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ BufferedReader reader =
+ new BufferedReader(new InputStreamReader(fs.open(path)));
+ try {
+ ArrayList addrs = new ArrayList();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ addrs.add(line);
+ }
+ return addrs;
+ } finally {
+ reader.close();
}
- FetchedSegments segments = new FetchedSegments(this.fs, segmentsDir.toString(),this.conf);
-
- this.segmentNames = segments.getSegmentNames();
-
- this.searcher = indexSearcher;
- this.detailer = indexSearcher;
- this.summarizer = segments;
- this.content = segments;
-
- if (LOG.isInfoEnabled()) { LOG.info("opening linkdb in " + linkDb); }
- this.linkDb = new LinkDbInlinks(fs, linkDb, this.conf);
- }
-
- private void init(DistributedSearch.Client client) {
- this.segmentNames = client.getSegmentNames();
- this.searcher = client;
- this.detailer = client;
- this.summarizer = client;
- this.content = client;
- this.linkDb = client;
}
-
public String[] getSegmentNames() {
return segmentNames;
}
@@ -179,10 +162,11 @@
String dedupField, String sortField, boolean reverse)
throws IOException {
- return searcher.search(query, numHits, dedupField, sortField, reverse);
+ return searchBean.search(query, numHits, dedupField, sortField, reverse);
}
- private class DupHits extends ArrayList {
+ @SuppressWarnings("serial")
+ private class DupHits extends ArrayList {
private boolean maxSizeExceeded;
}
@@ -248,13 +232,13 @@
if (LOG.isInfoEnabled()) {
LOG.info("searching for "+numHitsRaw+" raw hits");
}
- Hits hits = searcher.search(query, numHitsRaw,
+ Hits hits = searchBean.search(query, numHitsRaw,
dedupField, sortField, reverse);
long total = hits.getTotal();
- Map dupToHits = new HashMap();
- List resultList = new ArrayList();
- Set seen = new HashSet();
- List excludedValues = new ArrayList();
+ Map dupToHits = new HashMap();
+ List resultList = new ArrayList();
+ Set seen = new HashSet();
+ List excludedValues = new ArrayList();
boolean totalIsExact = true;
for (int rawHitNum = 0; rawHitNum < hits.getTotal(); rawHitNum++) {
// get the next raw hit
@@ -264,7 +248,7 @@
for (int i = 0; i < excludedValues.size(); i++) {
if (i == MAX_PROHIBITED_TERMS)
break;
- optQuery.addProhibitedTerm(((String)excludedValues.get(i)),
+ optQuery.addProhibitedTerm(excludedValues.get(i),
dedupField);
}
numHitsRaw = (int)(numHitsRaw * rawHitsFactor);
@@ -271,7 +255,7 @@
if (LOG.isInfoEnabled()) {
LOG.info("re-searching for "+numHitsRaw+" raw hits, query: "+optQuery);
}
- hits = searcher.search(optQuery, numHitsRaw,
+ hits = searchBean.search(optQuery, numHitsRaw,
dedupField, sortField, reverse);
if (LOG.isInfoEnabled()) {
LOG.info("found "+hits.getTotal()+" raw hits");
@@ -287,7 +271,7 @@
// get dup hits for its value
String value = hit.getDedupValue();
- DupHits dupHits = (DupHits)dupToHits.get(value);
+ DupHits dupHits = dupToHits.get(value);
if (dupHits == null)
dupToHits.put(value, dupHits = new DupHits());
@@ -297,7 +281,7 @@
// mark prior hits with moreFromDupExcluded
for (int i = 0; i < dupHits.size(); i++) {
- ((Hit)dupHits.get(i)).setMoreFromDupExcluded(true);
+ dupHits.get(i).setMoreFromDupExcluded(true);
}
dupHits.maxSizeExceeded = true;
@@ -318,7 +302,7 @@
Hits results =
new Hits(total,
- (Hit[])resultList.toArray(new Hit[resultList.size()]));
+ resultList.toArray(new Hit[resultList.size()]));
results.setTotalIsExact(totalIsExact);
return results;
}
@@ -325,19 +309,19 @@
public String getExplanation(Query query, Hit hit) throws IOException {
- return searcher.getExplanation(query, hit);
+ return searchBean.getExplanation(query, hit);
}
public HitDetails getDetails(Hit hit) throws IOException {
- return detailer.getDetails(hit);
+ return searchBean.getDetails(hit);
}
public HitDetails[] getDetails(Hit[] hits) throws IOException {
- return detailer.getDetails(hits);
+ return searchBean.getDetails(hits);
}
public Summary getSummary(HitDetails hit, Query query) throws IOException {
- return summarizer.getSummary(hit, query);
+ return segmentBean.getSummary(hit, query);
}
public Summary[] getSummary(HitDetails[] hits, Query query)
@@ -342,19 +326,19 @@
public Summary[] getSummary(HitDetails[] hits, Query query)
throws IOException {
- return summarizer.getSummary(hits, query);
+ return segmentBean.getSummary(hits, query);
}
public byte[] getContent(HitDetails hit) throws IOException {
- return content.getContent(hit);
+ return segmentBean.getContent(hit);
}
public ParseData getParseData(HitDetails hit) throws IOException {
- return content.getParseData(hit);
+ return segmentBean.getParseData(hit);
}
public ParseText getParseText(HitDetails hit) throws IOException {
- return content.getParseText(hit);
+ return segmentBean.getParseText(hit);
}
public String[] getAnchors(HitDetails hit) throws IOException {
@@ -366,12 +350,12 @@
}
public long getFetchDate(HitDetails hit) throws IOException {
- return content.getFetchDate(hit);
+ return segmentBean.getFetchDate(hit);
}
public void close() throws IOException {
- if (content != null) { content.close(); }
- if (searcher != null) { searcher.close(); }
+ if (searchBean != null) { searchBean.close(); }
+ if (segmentBean != null) { segmentBean.close(); }
if (linkDb != null) { linkDb.close(); }
if (fs != null) { fs.close(); }
}
@@ -376,6 +360,10 @@
if (fs != null) { fs.close(); }
}
+ public boolean ping() {
+ return true;
+ }
+
/** For debugging. */
public static void main(String[] args) throws Exception {
String usage = "NutchBean query";
@@ -396,13 +384,22 @@
Summary[] summaries = bean.getSummary(details, query);
for (int i = 0; i < hits.getLength(); i++) {
- System.out.println(" "+i+" "+ details[i] + "\n" + summaries[i]);
+ System.out.println(" " + i + " " + details[i] + "\n" + summaries[i]);
}
}
- public long getProtocolVersion(String className, long arg1) throws IOException {
- if(DistributedSearch.Protocol.class.getName().equals(className)){
- return 1;
+ public long getProtocolVersion(String className, long clientVersion)
+ throws IOException {
+ if(RPCSearchBean.class.getName().equals(className) &&
+ searchBean instanceof RPCSearchBean) {
+
+ RPCSearchBean rpcBean = (RPCSearchBean)searchBean;
+ return rpcBean.getProtocolVersion(className, clientVersion);
+ } else if (SegmentBean.class.getName().equals(className) &&
+ segmentBean instanceof RPCSegmentBean) {
+
+ RPCSegmentBean rpcBean = (RPCSegmentBean)segmentBean;
+ return rpcBean.getProtocolVersion(className, clientVersion);
} else {
throw new IOException("Unknown Protocol classname:" + className);
}
Index: src/java/org/apache/nutch/searcher/OpenSearchServlet.java
===================================================================
--- src/java/org/apache/nutch/searcher/OpenSearchServlet.java (revision 681741)
+++ src/java/org/apache/nutch/searcher/OpenSearchServlet.java (working copy)
@@ -43,8 +43,10 @@
/** Present search results using A9's OpenSearch extensions to RSS, plus a few
* Nutch-specific extensions. */
+@SuppressWarnings("serial")
public class OpenSearchServlet extends HttpServlet {
- private static final Map NS_MAP = new HashMap();
+ private static final Map NS_MAP =
+ new HashMap();
private int MAX_HITS_PER_PAGE;
static {
@@ -52,7 +54,7 @@
NS_MAP.put("nutch", "http://www.nutch.org/opensearchrss/1.0/");
}
- private static final Set SKIP_DETAILS = new HashSet();
+ private static final Set SKIP_DETAILS = new HashSet();
static {
SKIP_DETAILS.add("url"); // redundant with RSS link
SKIP_DETAILS.add("title"); // redundant with RSS title
@@ -171,8 +173,8 @@
Element rss = addNode(doc, doc, "rss");
addAttribute(doc, rss, "version", "2.0");
addAttribute(doc, rss, "xmlns:opensearch",
- (String)NS_MAP.get("opensearch"));
- addAttribute(doc, rss, "xmlns:nutch", (String)NS_MAP.get("nutch"));
+ NS_MAP.get("opensearch"));
+ addAttribute(doc, rss, "xmlns:nutch", NS_MAP.get("nutch"));
Element channel = addNode(doc, rss, "channel");
@@ -214,7 +216,7 @@
HitDetails detail = details[i];
String title = detail.getValue("title");
String url = detail.getValue("url");
- String id = "idx=" + hit.getIndexNo() + "&id=" + hit.getIndexDocNo();
+ String id = "idx=" + hit.getIndexNo() + "&id=" + hit.getUniqueKey();
if (title == null || title.equals("")) { // use url for docs w/o title
title = url;
@@ -283,7 +285,7 @@
private static void addNode(Document doc, Node parent,
String ns, String name, String text) {
- Element child = doc.createElementNS((String)NS_MAP.get(ns), ns+":"+name);
+ Element child = doc.createElementNS(NS_MAP.get(ns), ns+":"+name);
child.appendChild(doc.createTextNode(getLegalXml(text)));
parent.appendChild(child);
}
Index: src/java/org/apache/nutch/searcher/Query.java
===================================================================
--- src/java/org/apache/nutch/searcher/Query.java (revision 681741)
+++ src/java/org/apache/nutch/searcher/Query.java (working copy)
@@ -282,7 +282,7 @@
}
- private ArrayList clauses = new ArrayList();
+ private ArrayList clauses = new ArrayList();
private Configuration conf;
@@ -305,7 +305,7 @@
/** Return all clauses. */
public Clause[] getClauses() {
- return (Clause[])clauses.toArray(CLAUSES_PROTO);
+ return clauses.toArray(CLAUSES_PROTO);
}
/** Add a required term in the default field. */
@@ -361,7 +361,7 @@
public void write(DataOutput out) throws IOException {
out.writeByte(clauses.size());
for (int i = 0; i < clauses.size(); i++)
- ((Clause)clauses.get(i)).write(out);
+ clauses.get(i).write(out);
}
public static Query read(DataInput in, Configuration conf) throws IOException {
@@ -404,7 +404,7 @@
} catch (CloneNotSupportedException e) {
throw new RuntimeException(e);
}
- clone.clauses = (ArrayList)clauses.clone();
+ clone.clauses = (ArrayList)clauses.clone();
return clone;
}
@@ -412,9 +412,9 @@
/** Flattens a query into the set of text terms that it contains. These are
* terms which should be higlighted in matching documents. */
public String[] getTerms() {
- ArrayList result = new ArrayList();
+ ArrayList result = new ArrayList();
for (int i = 0; i < clauses.size(); i++) {
- Clause clause = (Clause)clauses.get(i);
+ Clause clause = clauses.get(i);
if (!clause.isProhibited()) {
if (clause.isPhrase()) {
Term[] terms = clause.getPhrase().getTerms();
@@ -426,7 +426,7 @@
}
}
}
- return (String[])result.toArray(new String[result.size()]);
+ return result.toArray(new String[result.size()]);
}
/**
@@ -457,7 +457,7 @@
for (int i = 0; i < clauses.length; i++) {
Clause c = clauses[i];
if (!new QueryFilters(conf).isField(c.getField())) { // unknown field
- ArrayList terms = new ArrayList(); // add name to query
+ ArrayList terms = new ArrayList(); // add name to query
if (c.isPhrase()) {
terms.addAll(Arrays.asList(c.getPhrase().getTerms()));
} else {
@@ -467,7 +467,7 @@
c = (Clause)c.clone();
c.field = Clause.DEFAULT_FIELD; // use default field instead
c.termOrPhrase
- = new Phrase((Term[])terms.toArray(new Term[terms.size()]));
+ = new Phrase(terms.toArray(new Term[terms.size()]));
}
output.clauses.add(c); // copy clause to output
}
Index: src/java/org/apache/nutch/searcher/QueryException.java
===================================================================
--- src/java/org/apache/nutch/searcher/QueryException.java (revision 681741)
+++ src/java/org/apache/nutch/searcher/QueryException.java (working copy)
@@ -17,6 +17,7 @@
package org.apache.nutch.searcher;
+@SuppressWarnings("serial")
public class QueryException extends java.io.IOException {
public QueryException(String message) {
super(message);
Index: src/java/org/apache/nutch/searcher/QueryFilters.java
===================================================================
--- src/java/org/apache/nutch/searcher/QueryFilters.java (revision 681741)
+++ src/java/org/apache/nutch/searcher/QueryFilters.java (working copy)
@@ -40,14 +40,14 @@
private static final Log LOG = LogFactory.getLog(QueryFilters.class);
private QueryFilter[] queryFilters;
- private HashSet FIELD_NAMES ;
- private HashSet RAW_FIELD_NAMES;
+ private HashSet FIELD_NAMES ;
+ private HashSet RAW_FIELD_NAMES;
- private static ArrayList parseFieldNames(Extension extension,
+ private static List parseFieldNames(Extension extension,
String attribute) {
String fields = extension.getAttribute(attribute);
if (fields == null) fields = "";
- return Collections.list(new StringTokenizer(fields, " ,\t\n\r"));
+ return Arrays.asList(fields.split("[,\\s]"));
}
public QueryFilters(Configuration conf) {
@@ -61,13 +61,14 @@
if (point == null)
throw new RuntimeException(QueryFilter.X_POINT_ID + " not found.");
Extension[] extensions = point.getExtensions();
- FIELD_NAMES = new HashSet();
- RAW_FIELD_NAMES = new HashSet();
+ FIELD_NAMES = new HashSet();
+ RAW_FIELD_NAMES = new HashSet();
QueryFilter[] filters = new QueryFilter[extensions.length];
for (int i = 0; i < extensions.length; i++) {
Extension extension = extensions[i];
- ArrayList fieldNames = parseFieldNames(extension, "fields");
- ArrayList rawFieldNames = parseFieldNames(extension, "raw-fields");
+ List fieldNames = parseFieldNames(extension, "fields");
+ List rawFieldNames =
+ parseFieldNames(extension, "raw-fields");
if (fieldNames.size() == 0 && rawFieldNames.size() == 0) {
if (LOG.isWarnEnabled()) {
LOG.warn("QueryFilter: " + extension.getId()
@@ -90,8 +91,8 @@
.getName());
} else {
// cache already filled
- FIELD_NAMES = (HashSet) objectCache.getObject("FIELD_NAMES");
- RAW_FIELD_NAMES = (HashSet) objectCache.getObject("RAW_FIELD_NAMES");
+ FIELD_NAMES = (HashSet) objectCache.getObject("FIELD_NAMES");
+ RAW_FIELD_NAMES = (HashSet) objectCache.getObject("RAW_FIELD_NAMES");
}
}
Index: src/java/org/apache/nutch/searcher/RPCSearchBean.java
===================================================================
--- src/java/org/apache/nutch/searcher/RPCSearchBean.java (revision 0)
+++ src/java/org/apache/nutch/searcher/RPCSearchBean.java (revision 0)
@@ -0,0 +1,7 @@
+package org.apache.nutch.searcher;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+public interface RPCSearchBean extends SearchBean, VersionedProtocol {
+
+}
Index: src/java/org/apache/nutch/searcher/RPCSegmentBean.java
===================================================================
--- src/java/org/apache/nutch/searcher/RPCSegmentBean.java (revision 0)
+++ src/java/org/apache/nutch/searcher/RPCSegmentBean.java (revision 0)
@@ -0,0 +1,7 @@
+package org.apache.nutch.searcher;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+public interface RPCSegmentBean extends SegmentBean, VersionedProtocol {
+
+}
Index: src/java/org/apache/nutch/searcher/SearchBean.java
===================================================================
--- src/java/org/apache/nutch/searcher/SearchBean.java (revision 0)
+++ src/java/org/apache/nutch/searcher/SearchBean.java (revision 0)
@@ -0,0 +1,12 @@
+package org.apache.nutch.searcher;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public interface SearchBean extends Searcher, HitDetailer {
+ public static final Log LOG = LogFactory.getLog(SearchBean.class);
+
+ public boolean ping() throws IOException ;
+}
Index: src/java/org/apache/nutch/searcher/SegmentBean.java
===================================================================
--- src/java/org/apache/nutch/searcher/SegmentBean.java (revision 0)
+++ src/java/org/apache/nutch/searcher/SegmentBean.java (revision 0)
@@ -0,0 +1,8 @@
+package org.apache.nutch.searcher;
+
+import java.io.IOException;
+
+public interface SegmentBean extends HitContent, HitSummarizer {
+
+ public String[] getSegmentNames() throws IOException;
+}
Index: src/java/org/apache/nutch/searcher/SolrSearchBean.java
===================================================================
--- src/java/org/apache/nutch/searcher/SolrSearchBean.java (revision 0)
+++ src/java/org/apache/nutch/searcher/SolrSearchBean.java (revision 0)
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.searcher;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.xml.parsers.SAXParserFactory;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.BooleanClause;
+import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.util.ToStringUtils;
+import org.apache.nutch.util.solr.SolrClient;
+import org.apache.nutch.util.solr.SolrResponseHandler;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+import org.xml.sax.XMLReader;
+
+public class SolrSearchBean implements SearchBean {
+
+ public static final Log LOG = LogFactory.getLog(SolrSearchBean.class);
+
+ private static final Text MIN_TEXT_WRITABLE =
+ new Text("");
+
+ private static final IntWritable MIN_INT_WRITABLE =
+ new IntWritable(Integer.MIN_VALUE);
+
+ private static final FloatWritable MIN_FLOAT_WRITABLE =
+ new FloatWritable(Float.MIN_VALUE);
+
+ /* We assume that an array is always an array of strings. */
+ public static enum FieldType { STR, INT, LONG, FLOAT,
+ DOUBLE, BOOL, DATE, ARR };
+
+ private SolrClient client;
+
+ private XMLReader reader;
+
+ private SolrResponseHandler handler;
+
+ private QueryFilters filters;
+
+ public SolrSearchBean(Configuration conf, String solrServer)
+ throws IOException {
+ client = new SolrClient(solrServer);
+ handler = new SolrResponseHandler();
+ filters = new QueryFilters(conf);
+ try {
+ reader = SAXParserFactory.newInstance().newSAXParser().getXMLReader();
+ reader.setContentHandler(handler);
+ } catch (Exception e) {
+ IOException ioe = new IOException();
+ ioe.initCause(e);
+ throw ioe;
+ }
+ }
+
+ public String getExplanation(Query query, Hit hit) throws IOException {
+ return "SOLR backend does not support explanations yet.";
+ }
+
+ public Hits search(Query query, int numHits, String dedupField,
+ String sortField, boolean reverse)
+ throws IOException {
+
+ // filter query string
+ BooleanQuery bQuery = filters.filter(query);
+
+ byte[] response;
+ if (sortField == null) {
+ response = client.search(stringify(bQuery), numHits,
+ new String[] { dedupField, "score", "id" }, sortField, reverse);
+ sortField = "score";
+ } else {
+ response = client.search(stringify(bQuery), numHits,
+ new String[] { dedupField, sortField, "id" }, sortField, reverse);
+ }
+
+ try {
+ reader.parse(new InputSource(new ByteArrayInputStream(response)));
+ } catch (SAXException e) {
+ IOException ioe = new IOException();
+ ioe.initCause(e);
+ throw ioe;
+ }
+ Hit[] hitArr = new Hit[handler.getNumTopResults()];
+ for (int i = 0; i < hitArr.length; i++) {
+ Map result = handler.getResult(i);
+ FieldType sortFieldType = handler.getType(sortField);
+ if (sortFieldType == FieldType.ARR) {
+ throw new RuntimeException("Sort value shouldn't be an array");
+ }
+
+ String sortStr = (String) result.get(sortField);
+ WritableComparable sortValue;
+ if (sortFieldType == null) {
+ sortValue = MIN_FLOAT_WRITABLE;
+ } else {
+ switch (sortFieldType) {
+ case STR:
+ if (sortStr == null) {
+ sortValue = MIN_TEXT_WRITABLE;
+ } else {
+ sortValue = new Text(sortStr);
+ }
+ break;
+ case INT:
+ if (sortStr == null) {
+ sortValue = MIN_INT_WRITABLE;
+ } else {
+ sortValue = new IntWritable(Integer.valueOf(sortStr));
+ }
+ break;
+ case FLOAT:
+ if (sortStr == null) {
+ sortValue = MIN_FLOAT_WRITABLE;
+ } else {
+ sortValue = new FloatWritable(Float.valueOf(sortStr));
+ }
+ break;
+ case DATE:
+ // TODO
+ throw
+ new NotImplementedException("Sort on date is not implemented");
+ default:
+ throw new RuntimeException("Unknown sort value type!");
+ }
+ }
+
+ FieldType dedupFieldType = handler.getType(dedupField);
+ if (dedupFieldType == FieldType.ARR) {
+ throw new RuntimeException("Dedup value shouldn't be an array");
+ }
+ String dedupValue = (String) result.get(dedupField);
+
+ String uniqueKey;
+ if (handler.getType("id") == FieldType.ARR) {
+ throw new RuntimeException("Unique key shouldn't be an array");
+ }
+ uniqueKey = (String) result.get("id");
+ if (uniqueKey == null) {
+ throw new RuntimeException("No value for unique key");
+ }
+
+ hitArr[i] = new Hit(uniqueKey, sortValue, dedupValue);
+ }
+
+ return new Hits(handler.getNumResults(), hitArr);
+ }
+
+ public void close() throws IOException { }
+
+ public HitDetails getDetails(Hit hit) throws IOException {
+ byte[] response = client.search("id:\"" + hit.getUniqueKey() + "\"", 1,
+ null, null, false);
+ try {
+ reader.parse(new InputSource(new ByteArrayInputStream(response)));
+ } catch (SAXException e) {
+ IOException ioe = new IOException();
+ ioe.initCause(e);
+ throw ioe;
+ }
+
+ if (handler.getNumResults() == 0) {
+ return null;
+ }
+
+ return buildDetails(handler.getResult(0));
+ }
+
+ public HitDetails[] getDetails(Hit[] hits) throws IOException {
+ StringBuilder buf = new StringBuilder();
+ buf.append("(");
+ for (Hit hit : hits) {
+ buf.append(" id:\"");
+ buf.append(hit.getUniqueKey());
+ buf.append("\"");
+ }
+ buf.append(")");
+
+ byte[] response =
+ client.search(buf.toString(), hits.length, null, null, false);
+ try {
+ reader.parse(new InputSource(new ByteArrayInputStream(response)));
+ } catch (SAXException e) {
+ IOException ioe = new IOException();
+ ioe.initCause(e);
+ throw ioe;
+ }
+
+ int numResults = handler.getNumTopResults();
+ if (numResults < hits.length) {
+ throw new RuntimeException("Missing hit details! Found: " + numResults +
+ ", expecting: " + hits.length);
+ }
+
+ /* Response returned from SOLR server may be out of
+ * order. So we make sure that nth element of HitDetails[]
+ * is the detail of nth hit.
+ */
+ Map detailsMap =
+ new HashMap(hits.length);
+ for (int i = 0; i < numResults; i++) {
+ HitDetails details = buildDetails(handler.getResult(i));
+ detailsMap.put(details.getValue("url"), details);
+ }
+
+ HitDetails[] detailsArr = new HitDetails[hits.length];
+ for (int i = 0; i < hits.length; i++) {
+ detailsArr[i] = detailsMap.get(hits[i].getUniqueKey());
+ }
+
+ return detailsArr;
+ }
+
+ public boolean ping() throws IOException {
+ return client.ping();
+ }
+
+ private static HitDetails buildDetails(Map result) {
+ List fieldList = new ArrayList();
+ List valueList = new ArrayList();
+
+ for (Map.Entry entry : result.entrySet()) {
+ String field = entry.getKey();
+ Object o = entry.getValue();
+ if (o.getClass().isArray()) {
+ String[] vals = (String[]) o;
+ for (String val : vals) {
+ fieldList.add(field);
+ valueList.add(val);
+ }
+ } else {
+ String val = (String) o;
+ fieldList.add(field);
+ valueList.add(val);
+ }
+ }
+
+ String[] fields = fieldList.toArray(new String[fieldList.size()]);
+ String[] values = valueList.toArray(new String[valueList.size()]);
+
+ return new HitDetails(fields, values);
+ }
+
+ /* Hackish solution for stringifying queries. Code from BooleanQuery.
+ * This is necessary because a BooleanQuery.toString produces
+ * statements like feed:http://www.google.com which doesn't work, we
+ * need feed:"http://www.google.com".
+ */
+ private static String stringify(BooleanQuery bQuery) {
+ StringBuilder buffer = new StringBuilder();
+ boolean needParens=(bQuery.getBoost() != 1.0) ||
+ (bQuery.getMinimumNumberShouldMatch()>0) ;
+ if (needParens) {
+ buffer.append("(");
+ }
+
+ BooleanClause[] clauses = bQuery.getClauses();
+ int i = 0;
+ for (BooleanClause c : clauses) {
+ if (c.isProhibited())
+ buffer.append("-");
+ else if (c.isRequired())
+ buffer.append("+");
+
+ org.apache.lucene.search.Query subQuery = c.getQuery();
+ if (subQuery instanceof BooleanQuery) { // wrap sub-bools in parens
+ buffer.append("(");
+ buffer.append(c.getQuery().toString(""));
+ buffer.append(")");
+ } else if (subQuery instanceof TermQuery) {
+ Term term = ((TermQuery) subQuery).getTerm();
+ buffer.append(term.field());
+ buffer.append(":\"");
+ buffer.append(term.text());
+ buffer.append("\"");
+ } else {
+ buffer.append(" ");
+ buffer.append(c.getQuery().toString());
+ }
+
+ if (i++ != clauses.length - 1) {
+ buffer.append(" ");
+ }
+ }
+
+ if (needParens) {
+ buffer.append(")");
+ }
+
+ if (bQuery.getMinimumNumberShouldMatch()>0) {
+ buffer.append('~');
+ buffer.append(bQuery.getMinimumNumberShouldMatch());
+ }
+
+ if (bQuery.getBoost() != 1.0f) {
+ buffer.append(ToStringUtils.boost(bQuery.getBoost()));
+ }
+
+ return buffer.toString();
+ }
+
+}
Index: src/java/org/apache/nutch/searcher/Summary.java
===================================================================
--- src/java/org/apache/nutch/searcher/Summary.java (revision 681741)
+++ src/java/org/apache/nutch/searcher/Summary.java (working copy)
@@ -88,7 +88,7 @@
public boolean isEllipsis() { return true; }
}
- private ArrayList fragments = new ArrayList();
+ private ArrayList fragments = new ArrayList();
private static final Fragment[] FRAGMENT_PROTO = new Fragment[0];
@@ -100,7 +100,7 @@
/** Returns an array of all of this summary's fragments.*/
public Fragment[] getFragments() {
- return (Fragment[])fragments.toArray(FRAGMENT_PROTO);
+ return fragments.toArray(FRAGMENT_PROTO);
}
/** Returns a String representation of this Summary. */
@@ -126,7 +126,7 @@
Fragment fragment = null;
StringBuffer buf = new StringBuffer();
for (int i=0; i")
.append(encode ? Entities.encode(fragment.getText())
@@ -185,7 +185,7 @@
out.writeInt(fragments.size());
Fragment fragment = null;
for (int i=0; i");
+ Iterator>> iterator = doc.fieldIterator();
+ while (iterator.hasNext()) {
+ Map.Entry> entry = iterator.next();
+ List values = entry.getValue();
+ for (String value : values) {
+ addField(entry.getKey(), value);
+ }
+ }
+ docBuffer.append("");
+
+ if (++numBufferedDocs >= MAX_BUFFERED_DOCS) {
+ sendDocuments();
+ }
+ }
+
+ /** Sends the given query to server.
+ *
+ * @param query Query in string form.
+ * @param numHits Maximum number of results to retrieve.
+ * @param fieldsToRetrieve Stored fields that will be retrieved for results.
+ * @param sortField Field to sort on.
+ * @param reverse If true, results are sorted in descending order.
+ * @return byte array containing the response from server.
+ * @throws IOException
+ */
+ public byte[] search(String query, int numHits, String[] fieldsToRetrieve,
+ String sortField, boolean reverse)
+ throws IOException {
+ StringBuilder uri = new StringBuilder(solrSelectUrl);
+ uri.append("?q=");
+ uri.append(URLEncoder.encode(query, "UTF-8"));
+ uri.append("&rows=");
+ uri.append(numHits);
+ uri.append("&fl=");
+ if (fieldsToRetrieve != null) {
+ for (String field : fieldsToRetrieve) {
+ if (field != null) {
+ uri.append(field);
+ uri.append("%2C"); // encoded ','
+ }
+ }
+ }
+ if (sortField != null) {
+ uri.append("&sort=");
+ uri.append(sortField);
+ if (reverse) {
+ uri.append("+desc"); //encoded " desc"
+ } else {
+ uri.append("+asc"); //encoded " asc"
+ }
+ }
+
+ // we don't need request parameters again
+ uri.append("&echoParams=none");
+
+ GetMethod get = new GetMethod(uri.toString());
+ try {
+ int code = httpClient.executeMethod(get);
+ // httpclient enforces reading response before sending another request
+ byte[] response = get.getResponseBody();
+ if (code != 200) {
+ throw new IOException("Http response code is " + code + ", not 200.");
+ }
+ get.releaseConnection();
+ return response;
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ IOException ioe = new IOException();
+ ioe.initCause(e);
+ throw ioe;
+ } finally {
+ get.releaseConnection();
+ }
+ }
+
+ /**
+ * Commit changes and optimize. Note: Solr will not serve newly added/removed
+ * documents until commit unless autoCommit=true.
+ *
+ * @param block If true, this method will block until optimize is complete.
+ */
+ public void commitAndOptimize(boolean block) throws IOException {
+ if (numBufferedDocs > 0) {
+ sendDocuments();
+ }
+ sendToSolrServer("");
+ sendToSolrServer("");
+ }
+
+ public boolean ping() throws IOException {
+ GetMethod get = new GetMethod(solrPingUrl);
+ try {
+ int code = httpClient.executeMethod(get);
+ // read response body and ignore
+ get.getResponseBody();
+
+ return code == 200;
+ } finally {
+ get.releaseConnection();
+ }
+ }
+
+ private void initDocBuffer() {
+ docBuffer = new StringBuilder(65536);
+ docBuffer.append("");
+ numBufferedDocs = 0;
+ }
+
+ private static boolean isLegalXml(final char c) {
+ return c == 0x9 || c == 0xa || c == 0xd || (c >= 0x20 && c <= 0xd7ff)
+ || (c >= 0xe000 && c <= 0xfffd) || (c >= 0x10000 && c <= 0x10ffff);
+ }
+
+ private void addField(String name, String value)
+ throws UnsupportedEncodingException {
+ docBuffer.append("");
+ StringBuilder valBuffer = new StringBuilder();
+ int valLen = value.length();
+ // escape illegal xml characters
+ for (int i = 0; i < valLen; i++) {
+ char ch = value.charAt(i);
+ switch (ch) {
+ case '"':
+ valBuffer.append(""");
+ case '\'':
+ valBuffer.append("'");
+ break;
+ case '&':
+ valBuffer.append("&");
+ break;
+ case '<':
+ valBuffer.append("<");
+ break;
+ case '>':
+ valBuffer.append(">");
+ break;
+ default:
+ // skip illegal characters we can't handle
+ if (isLegalXml(ch)) {
+ valBuffer.append(ch);
+ }
+ }
+ }
+ docBuffer.append(valBuffer.toString());
+ docBuffer.append("");
+ }
+
+ private void sendDocuments() throws IOException {
+ if (numBufferedDocs == 0) {
+ return;
+ }
+ docBuffer.append("");
+ sendToSolrServer(docBuffer.toString());
+ initDocBuffer();
+ }
+
+ private void sendToSolrServer(String data) throws IOException {
+ PostMethod post = new PostMethod(solrUpdateUrl);
+ post.setRequestEntity(new StringRequestEntity(data));
+ try {
+ int code = httpClient.executeMethod(post);
+ // httpclient enforces reading response before sending another request
+ byte[] response = post.getResponseBody();
+ if (code != 200) {
+ throw new IOException("Http response code is " + code + ", not 200. "
+ + "Response:" + new String(response));
+ }
+ post.releaseConnection();
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ IOException ioe = new IOException();
+ ioe.initCause(e);
+ throw ioe;
+ } finally {
+ post.releaseConnection();
+ }
+
+ }
+}
Index: src/java/org/apache/nutch/util/solr/SolrResponseHandler.java
===================================================================
--- src/java/org/apache/nutch/util/solr/SolrResponseHandler.java (revision 0)
+++ src/java/org/apache/nutch/util/solr/SolrResponseHandler.java (revision 0)
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.util.solr;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nutch.searcher.SolrSearchBean.FieldType;
+import org.xml.sax.Attributes;
+import org.xml.sax.SAXException;
+import org.xml.sax.helpers.DefaultHandler;
+
+public class SolrResponseHandler extends DefaultHandler {
+
+ private long numResults;
+ private List