
Type: New Feature

Status: Closed

Priority: Major

Resolution: Fixed

Affects Version/s: 0.6

Fix Version/s: 0.6

Component/s: Clustering

Labels:
Current LDA implementation in Mahout suffers from a few issues:
1) it's based on the original Variational Bayes E/M training methods of Blei et al (http://www.cs.princeton.edu/~blei/papers/BleiNgJordan2003.pdf), which are a) significantly more complex to implement/maintain, and b) significantly slower than subsequently discovered techniques
2) the entire "current working model" is held in memory in each Mapper, which limits the scalability of the implementation by numTerms in vocabulary * numTopics * 8bytes per double being less than the mapper heap size.
3) the sufficient statistics which need to be emitted by the mappers scale as numTopics * numNonZeroEntries in the corpus. Even with judicious use of Combiners (currently implemented), this can get prohibitively expensive in terms of network + disk usage.
In particular, point 3 looks like: a 1B nonzero entry corpus in Mahout would take up about 12GB of RAM in total, but if you wanted 200 topics, you'd be using 2.5TB if disk+network traffic per E/M iteration. Running a moderate 40 iterations we're talking about 100TB. Having tried this implementation on a 6B nonzero entry input corpus with 100 topics (500k term vocabulary, so memory wasn't an issue), I've seen this in practice: even with our production Hadoop cluster with many thousands of map slots available, even one iteration was taking more than 3.5hours to get to 50% completion of the mapper tasks.
Point 1) was simple to improve: switch from VB to an algorithm labeled CVB0 ("Collapsed Variational Bayes, 0th derivative approximation") in Ascuncion, et al ( http://www.datalab.uci.edu/papers/uai_2009.pdf ). I tried many approaches to get the overall distributed side of the algorithm to scale better, originally aiming at removing point 2), but it turned out that point 3) was what kept rearing its ugly head. The way that YahooLDA ( https://github.com/shravanmn/Yahoo_LDA ) and many others have achieved high scalability is by doing distributed Gibbs sampling, but that requires that you hold onto the model in distributed memory and query it continually via RPC. This could be done in something like Giraph or Spark, but not in vanilla Hadoop M/R.
The end result was to actually make point 2) even worse, and instead of relying on Hadoop combiners to aggregate sufficient statistics for the model, you instead do a full mapside cache of (this mapper's slice of) the next iteration's model, and emit nothing in each map() call, emitting the entire model at cleanup(), and then the reducer simply sums the submodels. This effectively becomes a form of ensemble learning: each mapper learns its own sequential model, emits it, the reducers (one for each topic) sum up these models into one, which is fed out to all the models in the next iteration.
In its current form, this LDA implementation can churn through about two M/R iterations per hour on the same cluster/data set mentioned above (which makes it at least 15x faster on larger data sets).
It probably requires a fair amount of documentation / cleanup, but it comes with a nice endtoend unit test (same as the one added to MAHOUT399), and also comes with an "inmemory" version of the same algorithm, for smaller datasets (i.e. those which can fit in memory).