Index: contrib/benchmark/nrtBench.py =================================================================== --- contrib/benchmark/nrtBench.py (revision 0) +++ contrib/benchmark/nrtBench.py (revision 0) @@ -0,0 +1,592 @@ +import types +import re +import time +import os +import shutil +import sys +import cPickle +import datetime + +# TODO +# - build wiki/random index as needed (balanced or not, varying # segs, docs) +# - verify step +# - run searches +# - get all docs query in here + +if sys.platform.lower().find('darwin') != -1: + osName = 'osx' +elif sys.platform.lower().find('win') != -1: + osName = 'windows' +elif sys.platform.lower().find('linux') != -1: + osName = 'linux' +else: + osName = 'unix' + +DEBUG = False + +# let shell find it: +#JAVA_COMMAND = 'java -Xms2048M -Xmx2048M -verbose:gc' +#JAVA_COMMAND = 'java -Xms2048M -Xmx2048M -Xbatch -server' +JAVA_COMMAND = 'java -Xms2048M -Xmx2048M -server' +#JAVA_COMMAND = 'java -Xms1024M -Xmx1024M -Xbatch -server -XX:+AggressiveOpts -XX:CompileThreshold=100 -XX:+UseFastAccessorMethods' + +INDEX_NUM_THREADS = 2 +SEARCH_NUM_THREADS = 8 + +INDEX_NUM_DOCS = 5000000 + +LOG_DIR = 'logs' + +DO_BALANCED = False + +WIKI_FILE = '/x/lucene/enwiki-20090724-pages-articles.xml.bz2' +INDEX_DIR_BASE = '/x/lucene' + +WIKI_LINE_FILE = '/x/lucene/enwiki-20090306-lines-1k-fixed.txt' + +NUM_ROUND = 1 + +if DEBUG: + RUN_TIME_SEC = 5.0 +else: + RUN_TIME_SEC = 60.0 + +if 0: + print 'compile...' + if '-nocompile' not in sys.argv: + if os.system('ant compile > compile.log 2>&1') != 0: + raise RuntimeError('compile failed (see compile.log)') + +BASE_SEARCH_ALG = ''' +#deletion.policy = org.apache.lucene.benchmark.utils.NoDeletionPolicy +analyzer=org.apache.lucene.analysis.standard.StandardAnalyzer +directory=FSDirectory +work.dir = $INDEX$ +search.num.hits = $NUM_HITS$ +query.maker=org.apache.lucene.benchmark.byTask.feeds.FileBasedQueryMaker +file.query.maker.file = queries.txt + +docs.file = $WIKI_LINE_FILE$ +content.source=org.apache.lucene.benchmark.byTask.feeds.LineDocSource +compound = false +#writer.info.stream = SystemOut + +doc.stored=true +doc.body.stored=false +doc.tokenized=false +doc.body.tokenized=true + +log.queries=true +log.step=100000 +content.source.forever = false +{ + OpenIndex() + { + $NRT$ + # Warm + { "XSearchWarm" $WARM$ } + $INDEXING$ + $SEARCHING$ + Wait($RUN_TIME$) + } + CloseReader + + # Do not keep any changes, so we can re-test on the same index again + RollbackIndex + NewRound +} : $NUM_ROUND$ + + +RepSumByPrefRound X +RepSumByPrefRound NearRealtime +''' + +BASE_INDEX_ALG = ''' +analyzer=org.apache.lucene.analysis.standard.StandardAnalyzer + +$OTHER$ +deletion.policy = org.apache.lucene.benchmark.utils.NoDeletionPolicy +doc.tokenized = false +doc.body.tokenized = true +doc.stored = true +doc.body.stored = false +doc.term.vector = false +log.step.AddDoc=10000 + +directory=FSDirectory +autocommit=false +compound=false + +work.dir=$WORKDIR$ + +{ "BuildIndex" + - CreateIndex + $INDEX_LINE$ + - CommitIndex(dp0) + - CloseIndex + $DELETIONS$ +} + +RepSumByPrefRound BuildIndex +''' + +def getRate(line): + line = line.replace('-', ' ') + isIndexing = line.startswith('XIndexing') + v = line.strip().split() + # print len(v), v + upto = 0 + i = 0 + vx = None + while i < len(v): + if v[i] == '-': + i += 1 + continue + else: + upto += 1 + i += 1 + if upto == 5: + vx = float(v[i-1].replace(',', '')) + break + + if vx is None: + raise RuntimeError('did not find qps or dps') + + return vx + + +class RunAlgs: + + def __init__(self, resultsPrefix): + self.counter = 0 + self.results = [] + self.fOut = open('%s.txt' % resultsPrefix, 'wb') + + def makeIndex(self, label, source, numDocs, dir=None, balancedNumSegs=None, deletePcts=None): + + if source not in ('wiki', 'random'): + raise RuntimeError('source must be wiki or random') + + if dir is not None: + fullDir = '%s/contrib/benchmark' % dir + if DEBUG: + print ' chdir %s' % fullDir + os.chdir(fullDir) + + if label is None: + indexName = '%s.%gM' % (source, numDocs/1000000.0) + else: + indexName = '%s.%s.%gM' % (source, label, numDocs/1000000.0) + + if balancedNumSegs is not None: + indexName += '_balanced%d' % balancedNumSegs + + fullIndexPath = '%s/%s' % (INDEX_DIR_BASE, indexName) + + print 'path %s' % fullIndexPath + + if os.path.exists(fullIndexPath): + print 'Index %s already exists...' % fullIndexPath + return indexName + + print 'Now create index %s...' % fullIndexPath + + s = BASE_INDEX_ALG + + if source == 'wiki': + other = '''doc.index.props = true +content.source=org.apache.lucene.benchmark.byTask.feeds.EnwikiContentSource +docs.file=%s +''' % WIKI_FILE + #addDoc = 'AddDoc(1024)' + addDoc = 'AddDoc' + else: + other = '''doc.index.props = true +content.source=org.apache.lucene.benchmark.byTask.feeds.SortableSingleDocSource +''' + addDoc = 'AddDoc' + if INDEX_NUM_THREADS > 1: + #other += 'doc.reuse.fields=false\n' + s = s.replace('$INDEX_LINE$', '[ { "AddDocs" %s > : %s } : %s' % \ + (addDoc, numDocs/INDEX_NUM_THREADS, INDEX_NUM_THREADS)) + else: + s = s.replace('$INDEX_LINE$', '{ "AddDocs" %s > : %s' % \ + (addDoc, numDocs)) + + s = s.replace('$WORKDIR$', fullIndexPath) + + if deletePcts is not None: + dp = '# Do deletions\n' + dp += 'OpenReader(false)\n' + for pct in deletePcts: + if pct != 0: + dp += 'DeleteByPercent(%g)\n' % pct + dp += 'CommitIndex(dp%g)\n' % pct + dp += 'CloseReader()\n' + else: + dp = '' + + s = s.replace('$DELETIONS$', dp) + + if balancedNumSegs is not None: + other += ''' merge.factor=1000 + max.buffered=%d + ram.flush.mb=2000 + ''' % (numDocs/balancedNumSegs) + else: + if source == 'random': + other += 'ram.flush.mb=1.0\n' + else: + other += 'ram.flush.mb=32.0\n' + + s = s.replace('$OTHER$', other) + + try: + self.runOne(s, 'index_%s' % indexName, isIndex=True) + except: + if os.path.exists(fullIndexPath): + shutil.rmtree(fullIndexPath) + raise + + return indexName + + def getLogPrefix(self, **dArgs): + l = dArgs.items() + l.sort() + s = '_'.join(['%s=%s' % tup for tup in l]) + s = s.replace(' ', '_') + s = s.replace('"', '_') + return s + + def runOne(self, alg, logFileName, isIndex=False, dir=None, queries=None): + + if dir is None: + fullDir = '.' + else: + fullDir = '%s/contrib/benchmark' % dir + + if fullDir != '.': + if DEBUG: + print ' chdir %s' % fullDir + os.chdir(fullDir) + + if queries is not None: + if type(queries) in types.StringTypes: + queries = [queries] + open('%s/queries.txt' % fullDir, 'wb').write('\n'.join(queries)) + + if DEBUG: + algFile = 'tmp.alg' + else: + algFile = 'tmp.%s.alg' % os.getpid() + open(algFile, 'wb').write(alg) + + fullLogFileName = '%s/%s/%s' % (fullDir, LOG_DIR, logFileName) + print ' log: %s' % fullLogFileName + if not os.path.exists(LOG_DIR): + print ' mkdir %s' % LOG_DIR + os.makedirs(LOG_DIR) + + command = '%s -classpath ../../build/classes/java:../../build/classes/demo:../../build/contrib/highlighter/classes/java:lib/commons-digester-1.7.jar:lib/commons-collections-3.1.jar:lib/commons-compress-1.0.jar:lib/commons-logging-1.0.4.jar:lib/commons-beanutils-1.7.0.jar:lib/xerces-2.9.0.jar:lib/xml-apis-2.9.0.jar:../../build/contrib/benchmark/classes/java org.apache.lucene.benchmark.byTask.Benchmark %s > "%s" 2>&1' % (JAVA_COMMAND, algFile, fullLogFileName) + + if DEBUG: + print 'command=%s' % command + + success = False + try: + t0 = time.time() + if os.system(command) != 0: + raise RuntimeError('FAILED') + t1 = time.time() + success = True + finally: + if not DEBUG: + if not success: + if os.path.exists('tmp.alg'): + os.remove('tmp.alg') + os.rename(algFile, 'tmp.alg') + else: + os.remove(algFile) + + if isIndex: + s = open(fullLogFileName, 'rb').read() + if s.find('Exception in thread "') != -1 or s.find('at org.apache.lucene') != -1: + raise RuntimeError('alg hit exceptions') + return + + else: + + # Parse results: + bestNRTRate = bestQPS = bestDPS = None + count = count2 = 0 + warmTime = None + + qps = [] + nrt = [] + dps = [] + + for line in open(fullLogFileName, 'rb').readlines(): + + if line.find('Exception') != -1 or line.find('at org.apache.lucene') != -1: + raise RuntimeError('alg hit exception') + + if line.startswith('XSearchWarm'): + v = line.strip().split() + warmTime = float(v[5]) + if line.startswith('XSearchReal'): + qps.append(getRate(line)) + elif line.startswith('XIndexing'): + dps.append(getRate(line)) + elif line.startswith('NearRealtimeReader('): + nrt.append(getRate(line)) + + if len(qps) != NUM_ROUND: + raise RuntimeError('did not find %s rounds (qps) (got %s)' % (NUM_ROUND, len(qps))) + + if len(dps) not in (0, NUM_ROUND): + raise RuntimeError('did not find %s rounds (dps) (got %s)' % (NUM_ROUND, len(dps))) + + if len(nrt) not in (0, NUM_ROUND): + raise RuntimeError('did not find %s rounds (nrt) (got %s)' % (NUM_ROUND, len(nrt))) + + bestQPS = None + bestNRT = None + bestDPS = None + + for i in range(NUM_ROUND): + if bestQPS is None or qps[i] > bestQPS: + bestQPS = qps[i] + if len(dps) != 0: + bestDPS = dps[i] + bestNRT = nrt[i] + + if bestQPS is None: + raise RuntimeError('did not find any QPS') + + return bestQPS, bestDPS, bestNRT + + def getSearchAlg(self, indexPath, searchTask, + runTimeSec, indexingMode, + nrtReopenSec, + numHits): + + s = BASE_SEARCH_ALG + s = s.replace('$NUM_ROUND$', str(NUM_ROUND)) + s = s.replace('$WIKI_LINE_FILE$', WIKI_LINE_FILE) + + if indexingMode is not None: + mode, ratePerSec = indexingMode + if mode == 'add': + # update docs (only add docs) + x = 'AddDoc' + elif mode == 'update': + # update docs (deletions) + x = 'UpdateDoc' + else: + raise RuntimeError('unknown indexing mode %s' % str(indexingMode)) + + idx = '[ "XIndexing" { %s > : * : %d/min] : %d &' % (x, ratePerSec*60/INDEX_NUM_THREADS, INDEX_NUM_THREADS) + if mode == 'update': + s = ('doc.random.id.limit=%d\n' % INDEX_NUM_DOCS) + s + else: + # no indexing + idx = '' + + s = s.replace('$INDEXING$', idx) + s = s.replace('$RUN_TIME$', '%g' % runTimeSec) + s = s.replace('$WARM$', searchTask) + s = s.replace('$SEARCHING$', '[ "XSearchReal" { %s > : * ] : %d &' % \ + (searchTask, SEARCH_NUM_THREADS)) + + if nrtReopenSec is None: + x = 'OpenReader' + else: + x = 'NearRealtimeReader(%g) &' % nrtReopenSec + s = s.replace('$NRT$', x) + s = s.replace('$INDEX$', '%s/%s' % (INDEX_DIR_BASE, indexPath)) + s = s.replace('$NUM_HITS$', str(numHits)) + + return s + + def compare(self, baseline, new, *params): + + qpsOld, dpsOld, nrtOld = baseline + qpsNew, dpsNew, nrtNew = new + pct = 100.0*(qpsNew-qpsOld)/qpsOld + print ' diff: %.1f%%' % pct + self.results.append((baseline, new, params)) + + self.fOut.write('|%s|%.2f|%.2f|%.1f%%|\n' % \ + ('|'.join(str(x) for x in params), + qpsOld, qpsNew, pct)) + self.fOut.flush() + + def save(self, name): + f = open('%s.pk' % name, 'wb') + cPickle.dump(self.results, f) + f.close() + +def usage(): + print + print 'Usage: python -u %s -run | -report ' % sys.argv[0] + print + print ' -run runs all tests, saving results to file .pk' + print ' -report opens .pk and prints Jira table' + print ' -nocompile [skip "ant compile"]' + print + sys.exit(1) + +def main(): + + if not os.path.exists(LOG_DIR): + os.makedirs(LOG_DIR) + + if '-run' in sys.argv: + i = sys.argv.index('-run') + mode = 'run' + if i < len(sys.argv)-1: + name = sys.argv[1+i] + else: + usage() + elif '-report' in sys.argv: + mode = '-report' + i = sys.argv.index('-report') + mode = 'report' + if i < len(sys.argv)-1: + name = sys.argv[1+i] + else: + usage() + else: + usage() + + if mode in ('run',): + run(mode, name) + else: + report(name) + +def report(name): + printVersions() + + results = cPickle.load(open('%s.pk' % name)) + + # collate + d = {} + printedBaseline = False + for baseline, new, params in results: + qpsOld, dpsOld, nrtOld = baseline + qpsNew, dpsNew, nrtNew = new + if not printedBaseline: + print + print 'Baseline QPS %s' % qpsOld + print + printedBaseline = True + indexingMode, indexingRate, reopenRate = params + tup = (indexingRate, reopenRate) + if tup not in d: + d[tup] = [None, None] + if indexingMode == 'add': + d[tup][0] = (qpsOld, qpsNew) + else: + d[tup][1] = (qpsOld, qpsNew) + + l = d.items() + l.sort() + + print '||Indexing docs/sec||NRT reopen period (sec)||QPS add||QPS update||QPS add (% diff)||QPS update (% diff)||' + for (indexingRate, reopenRate), results in l: + qpsAddOld, qpsAddNew = results[0] + qpsUpdateOld, qpsUpdateNew = results[1] + assert qpsAddOld == qpsUpdateOld + + pct = 100.0*(qpsAddNew/qpsAddOld-1.0) + if pct < 0.0: + c = 'red' + else: + c = 'green' + a = '{color:%s}%.1f%%{color}' % (c, pct) + + pct = 100.0*(qpsUpdateNew/qpsUpdateOld-1.0) + if pct < 0.0: + c = 'red' + else: + c = 'green' + b = '{color:%s}%.1f%%{color}' % (c, pct) + + print '|%g|%g|%.1f|%.1f|%s|%s|' % \ + (indexingRate, reopenRate, qpsAddNew, qpsUpdateNew, a, b) + +def printVersions(): + print + print 'JAVA:\n%s' % os.popen('java -version 2>&1').read() + + print + if osName != 'windows': + print 'OS:\n%s' % os.popen('uname -a 2>&1').read() + else: + print 'OS:\n%s' % sys.platform + +def run(mode, name): + + if '-nocompile' not in sys.argv: + print '"ant compile"...' + if os.system('ant compile') != 0: + raise RuntimeError('ant compile failed') + + r = RunAlgs(name) + + if not os.path.exists(WIKI_FILE): + print + print 'ERROR: wiki source file "%s" does not exist' % WIKI_FILE + print + sys.exit(1) + + printVersions() + + indexPath = r.makeIndex(None, 'wiki', INDEX_NUM_DOCS) + + source = 'wiki' + numHits = 10 + + queries = '1' + + # overall baseline -- just searching w/ no updates, no NRT reopens + print + print 'RUN: baseline' + s = r.getSearchAlg(indexPath, 'Search', RUN_TIME_SEC, + None, None, numHits) + baseline = r.runOne(s, 'baseline', queries=queries) + print ' baseline qps = %.2f/sec' % baseline[0] + + for indexingRate in (1, 10, 100, 1000): + + for nrtReopenPeriod in (0.1, 0.5, 1.0, 2.5, 5.0): + + if indexingRate == 1 and nrtReopenPeriod < 1.0: + continue + + for indexingMode in ('update', 'add'): + + prefix = r.getLogPrefix(indexingMode=indexingMode, + indexingRate=indexingRate, + nrtReopenPeriod=nrtReopenPeriod) + print '\nRUN: indexing=%s (rate %s docs/sec) nrtReopenPeriod=every %g sec' % \ + (indexingMode, indexingRate, nrtReopenPeriod) + + s = r.getSearchAlg(indexPath, 'Search', RUN_TIME_SEC, + (indexingMode, indexingRate), + nrtReopenPeriod, numHits) + run = r.runOne(s, 'nrt_%s' % prefix, queries=queries) + print ' qps = %.2f/sec, dps = %.2f/sec, nrtRate = every %.2f sec' % \ + (run[0], run[1], 1.0/run[2]) + + if not DEBUG: + r.compare(baseline, run, indexingMode, indexingRate, nrtReopenPeriod) + r.save(name) + +def cleanScores(l): + for i in range(len(l)): + pos = l[i].find(' score=') + l[i] = l[i][:pos].strip() + +if __name__ == '__main__': + main() Property changes on: contrib/benchmark/nrtBench.py ___________________________________________________________________ Added: svn:eol-style + native