Details
Description
See attached pdf for outline of proposed method.
All comments are welcome.

 ssvdCDH3or0.21.patch.gz
 69 kB
 dlyubimov2

 ssvdCDH3or0.21.patch.gz
 68 kB
 dlyubimov2

 ssvdCDH3or0.21.patch.gz
 25 kB
 dlyubimov2

 ssvdCDH3or0.21.patch.gz
 24 kB
 dlyubimov2

 SSVD working notes.pdf
 1.09 MB
 dlyubimov2

 ssvdCDH3or0.21.patch.gz
 24 kB
 dlyubimov2

 ssvdCDH3or0.21.patch.gz
 24 kB
 dlyubimov2

 SSVD working notes.pdf
 1.03 MB
 dlyubimov2

 ssvdCDH3or0.21.patch.gz
 23 kB
 dlyubimov2

 ssvdCDH3or0.21.patch.gz
 23 kB
 dlyubimov2

 ssvdCDH3or0.21.patch.gz
 23 kB
 dlyubimov2

 SSVD working notes.pdf
 1022 kB
 dlyubimov2

 ssvdm3.patch.gz
 21 kB
 dlyubimov2

 SSVD working notes.pdf
 1020 kB
 dlyubimov2

 ssvdm2.patch.gz
 20 kB
 dlyubimov2

 SSVD working notes.pdf
 1015 kB
 dlyubimov2

 ssvdm1.patch.gz
 18 kB
 dlyubimov2

 QR decomposition for Map.pdf
 359 kB
 dlyubimov2

 QR decomposition for Map.pdf
 350 kB
 dlyubimov2

 QR decomposition for Map.pdf
 310 kB
 dlyubimov2

 sd.tex
 11 kB
 Ted Dunning

 sd.pdf
 151 kB
 Ted Dunning

 sd.tex
 10 kB
 Ted Dunning

 sd.pdf
 153 kB
 Ted Dunning

 sd.pdf
 146 kB
 Ted Dunning

 sd.tex
 8 kB
 Ted Dunning

 Modified stochastic svd algorithm for mapreduce.pdf
 341 kB
 dlyubimov2

 MAHOUT376.patch
 22 kB
 Ted Dunning

 ASF.LICENSE.NOT.GRANTEDStochastic SVD using eigensolver trick.pdf
 245 kB
 Dmitriy Lyubimov

 ASF.LICENSE.NOT.GRANTEDsd.pdf
 136 kB
 Ted Dunning

 ASF.LICENSE.NOT.GRANTEDsdbib.bib
 0.3 kB
 Ted Dunning

 ASF.LICENSE.NOT.GRANTEDsd.tex
 5 kB
 Ted Dunning
Issue Links
 duplicates

MAHOUT309 Implement Stochastic Decomposition
 Closed
 is related to

MAHOUT623 Bug/improvement: add "overwrite" option to Stochastic SVD command line and API
 Closed
 relates to

MAHOUT593 Backport of Stochastic SVD patch (Mahout376) to hadoop 0.20 to ensure compatibility with current Mahout dependencies.
 Closed
Activity
Per Ted's request, i am attaching a conspectus of our previous discussion of Ted's suggested mods to Tropp's stochastic svd. It doesn't include Q orthonormalization.
Here is a workinprogress patch that illustrates how I plan to do the stochastic multiplication.
For moderate sized problems, this will be the major step required since all of the dense intermediate products will fit in memory. For larger problems, additional tricks will be necessary.
More minor comments:
 Don't forget a copyright header for new files
 murmurInt should inline those constants or declare them constants
 More VirtualRandomMatrix fields ought be final, conceptually
 VirtualRandomVector doesn't need "size"
 "Utils" classes ought to have a private constructor IMHO
Postponing since it is still too raw.
i am chipping out on mapper implementation on weekends. so far i got to Q orthogonalization, and should be able to produce BB^t at the next one.
I'll post a patch when there's something working, but it is a slow progress due to my load. My employer at the moment is not eager to advance tasks that may depend on this, so i have to do it on my own time.
I have couple of doubts. I do amended GramSchmidt for the blocks of Y to produce blocks of Q, but while Q would end up orthonormal, i am not sure that Q and Y would end up spanning the same space. Although the fact that Y is random product means Q may also be more or less random basis so maybe it doesn't matter so much that span(Q)=exactly span(Y).
Second concern is still the situation when last split producted by MR doesn't have minimally sufficient k+p records of A for producing orthogonal Q. The ideal outcome is then just to add it to another split, but i can't figure an easy enough way to do that within MR framework (esp. if the input is serialized using compressed sequence file). one way is to do custom split indexing based on # of records encountered (similar to what that lzo MR project does). but it sounds too complicated to me. Another way is just to do a prepass over A and prepartition it the way that this condition is satisfied. Then have a custom split so that there's 1 mapper per partition. But that's still one additional preprocessing step which we'd make just for the sake of just a fraction of A. Ideas are welcome here.
I have couple of doubts. I do amended GramSchmidt for the blocks of Y to produce blocks of Q, but while Q would end up orthonormal, i am not sure that Q and Y would end up spanning the same space. Although the fact that Y is random product means Q may also be more or less random basis so maybe it doesn't matter so much that span(Q)=exactly span(Y).
Since Q is orthonormal and since QR = Y, Q is exactly a basis of Y. The only issue is that R isn't really right triangular. That doesn't matter here.
Second concern is still the situation when last split producted by MR doesn't have minimally sufficient k+p records of A for producing orthogonal Q. The ideal outcome is then just to add it to another split, but i can't figure an easy enough way to do that within MR framework (esp. if the input is serialized using compressed sequence file).
If necessary, the input format can just note when the last block is small compared to previous blocks and round up on the next to last block.
Actually, i reviewed getSplits() code for sequence file. it seems to honor minSplitSize property of the FileInputFormat. What's more, it makes sure that the last block is no less than 1.1 times min split size. So that should work nicely. if we get insufficient # of rows in mappers, i guess we can just increase the minSplitSize. Unless input is partitioned so that min split size > min file size in the input.
Another detail in Q computation:
For now we assume that Q=(1/L)Q', where Q' is the output of block orthonormalization, and L is the number of blocks we ended up with.
However, in case when A is rather sparse, some Y blocks may not end up spanning R^(k+p) in which case orthogonalization would not produce normal columns. It doesn't mean that we can't orthonormalize the entire Y though. It just means that L is not for the entire Q but rather is individual for every column vector of Q and our initial assumption Q=(1/L)Q' doesn't always hold.
Which is fine per se, but then it means that there are perhaps quite frequent exceptions to assumption B =(1/L) Q'^transpose * A. It would be easy to compute B^transpose in the same mapreduce pass as Q using multiple outputs , one for Q and one for Btranspose (which is what i am doing right now). But unless there's any workaround for the problem above, this B^transpose is incorrect for some sparse cases.
Ted, any thoughts? Thank you.
Dima
I think that what you are saying is that some of the A_i blocks that make up A may be rank deficient.
That is definitely a risk if the number of rows in A_i is small compared to the size of Q_i. If the number of rows of A_i is much larger than the size of Q_i, then that is very, very unlikely.
Regardless, we will still have Q*Q = I and Q_i will still span the projection of A_i even if A_i and thus A_i \Omega is rank deficient. Thus Q will still span A \Omega. This gives us Q Q* A \approx A as desired.
So, my feeling is that what you say is correct, but it isn't a problem.
yes, I mean rank (Yblock) < (k+p) sometimes.
Ok. I don't know how often matrix A may be too sparse.
Just in case, i gave it a thought and here's what i think may help to account for this.
It would seem that we can address that by keeping vector L of dimension k+p where L[i]=# of blocks of Q where rank(Qblock)>i.
if B' is compiled in the same pass as B'=sum[ Q^t_(i*)A_(i*)] then it just means that for actual B we need to correct rows of B as B(i*)=(1/L[i]) * B'_(i*). Of course we don't actually have to correct them but just rather keep in mind that B is defined not just by the data but also by this scaling vector L. So subsequent steps may just account for it .
Of course, as an intermediate validation step, we check if any of L[i] is 0, and if it is it pretty much means that rank(A)<k+p and we can't have a good svd anyway so we will probably raise and exception in this case and ask to consider to reduce oversampling or k. Or perhaps it is a bad case for distributed computation anyway.
Right now i am just sending partial L vectors as q row with index 1 and sum it up in combiner and reducer.
so i got to singular values now. I run a unit test so that k+p=n. When i parameterize algorithm so that only one Qblock is produced , the eigenvalues match the stock result at least as good as 10E5. Which is expected under the circumstances. however as soon as i increase number of Qblocks >1, the singular values go astray as much as 10%. Not good. In both cases, the entire Q passes the orthonormality test. I guess it means that as i thought before, doing block orthonormalization this way does result in a subspace different from original span by Y.I need to research on doing orthonormalization with blocks. I think that's the only showstopper here that is still left. It may result in a rewrite that splits one job producing both Q and Bt, into several though.
My quick scan of the literature seems to indicate that best promise for full scale QR over map/reduce is rowwise givens with top k+p rows of the blocks being processed in combiners and reducers for the reminder of the first step of givens QR step. this can be done in the first step of the job, but i am yet to figure how to combine and reduce 'rho' intermediate results into final Q. [Golub, Van Loan 3rd ed] p. $5.2.3
Updated outline document to show corrected form of blockwise extraction of Q
Dima,
I just attached an update to the original outline document I posted. The gist of it is that the Q_i need to be arranged in block diagonal form in order to form a bases of A\Omega. When that is done, my experiments show complete agreement with the original algorithm.
Here is R code that demonstrates decomposition without blocking and a 2 way block decomposition:
# SVD decompose a matrix, extracting the first k singular values/vectors # using k+p random projection svd.rp = function(A, k=10, p=5) { n = nrow(A) y = A %*% matrix(rnorm(n * (k+p)), nrow=n) q = qr.Q(qr(y)) b = t(q) %*% A svd = svd(b) list(u=q%*%svd$u, d=svd$d, v=svd$v) } # blockwise SVD decompose a matrix, extracting the first k singular values/vectors # using k+p random projection svd.rpx = function(A, k=10, p=5) { n = nrow(A) # block sizes n1 = floor(n/2) n2 = nn1 r = matrix(rnorm(n * (k+p)), nrow=n) A1 = A[1:n1,] A2 = A[(n1+1):n,] # blockwise multiplication and basis y1 = A1 %*% r q1 = qr.Q(qr(y1)) y2 = A2 %*% r q2 = qr.Q(qr(y2)) # construction of full q (not really necessary) z1 = diag(0, nrow=nrow(q1), ncol=(k+p)) z2 = diag(0, nrow=nrow(q2), ncol=(k+p)) q = rbind(cbind(q1, z1), cbind(z2, q2)) b = t(q) %*% A # we can compute b without forming the block diagonal Q bx = rbind(t(q1)%*%A1, t(q2)%*%A2) # now the decomposition continues svd = svd(bx) # return all the pieces for checking list(u=q%*%svd$u, d=svd$d, v=svd$v, q1=q1, q2=q2, q=q, b=b, bx=bx) }
Note that this code has a fair bit of fat in it for debugging or illustrative purposes.
Updates.
I think that the outline of the algorithm is now in place. As it currently stands, it should scale to
about a thousand cores (maybe more) and should allow very small memory footprint machines
to participate.
That level should allow us to decompose data in the scale of 110 billion nonzeros, at a guess.
Updated version.
I think that this is actually a feasible algorithm.
These are really the same.
I am currently working to drive the working prototype the version with standalone thin QR step which would be memory independent of num of rows in A and would result into BBt eigensolution of (k+p)x(k+p) dimensionality only, the rest being driven by Map Reduce. I've got single stream version working seemingly well, here's the update on this WIP. Although in the end i am not sure it would offer any reallife improvement, as it would seem to require a second pass over A as it is not possible to finish Q^t x B computation in single step with this approach.
Still, after 2 passes, we should be done with eigen values (and perhaps (k+p)x(k+p) dimensionality for eigensolver input would allow us to increase oversampling p somewhat, hence precision). Hard to see from here yet though. Additional (optional) MR steps would only be needed if U or V is or both are desired.
QR step doc is ready for review.
Especially sections 3.2 and on which i still haven't worked up in the actual code. These reflect my original ideas to deal with blockwise QR in a MapReduce settings. I am probably retracing some block computations already found elsewhere (such as in LAPack) but i think it may actually work out ok.
QR step is now fully verified with MapReduce emulation. Updated QR step document with fixes that resulted from verification. I will be putting all that into complete map reduce, hopefully within a couple of weeks now. Initial patch will be dependent on CDH3b3 hadoop, if we need a backport to legacy api, that'll take some more time on best effort basis on my part.
git patch m1 : WIP but important milestone: prototype & MR implementation at the level of computing full Q and singlular values. as i mentioned, needs CDH3b2 (or b3).
Local MR test runs MR solver in local mode for a moderately low rank random matrix 80,000x100 (r=251, k+p=100). Test output i get on my laptop (first are singulars for 100x100 BBt matrix using commonsmath Eigensolver; 2 – output of SVs produced by Colt SVD of 80,000x100 same source matrix
SSVD solver singular values:
svs: 4220.258342 4215.924299 4213.352353 4210.786495 4203.422385 4201.047189 4194.987920 4193.434856 4187.610381 4185.546818 4179.867986 4176.056232 4172.784145 4169.039073 4168.384457 4164.293827 4162.647531 4160.483398 4157.878385 4154.713189 4152.172788 4149.823917 4146.500139 4144.565227 4142.625983 4141.291209 4138.105799 4135.564939 4134.772833 4129.223450 4129.101594 4126.679080 4124.385614 4121.791730 4119.645948 4115.975993 4112.947092 4109.586452 4107.985419 4104.871381 4102.438854 4099.762117 4098.968505 4095.720204 4091.114871 4090.190141 (...omited) 3950.897035
Colt SVD solver singular values:
svs: 4220.258342 4215.924299 4213.352353 4210.786495 4203.422385 4201.047189 4194.987920 4193.434856 4187.610381 4185.546818 4179.867986 4176.056232 4172.784145 4169.039073 4168.384457 4164.293827 4162.647531 4160.483398 4157.878385 4154.713189 4152.172788 4149.823917 4146.500139 4144.565227 4142.625983 4141.291209 4138.105799 4135.564939 4134.772833 4129.223450 4129.101594 4126.679080 4124.385614 4121.791730 4119.645948 4115.975993 4112.947092 4109.586452 4107.985419 4104.871381 4102.438854 4099.762117 4098.968505 4095.720204 4091.114871 4090.190141 (....omited) 3950.897035
I will be updating my notes with a couple of optimizations i applied in this code not yet mentioned.
Dima
Working notes for the process i used. i think these are pretty much final. I guess it turned out to be a little voluminous, but it mentions pretty much all essential details i may want to put down for my future reference. If one reads it, he/she may start directly with p. 5 of MapReduce implementation and then refer to algorithms in previous sections and dicussion that lead to their formation as needed.
Added U and V computations. Labels of A rows are propagated to U rows as keys. bumped up the test to A of dimensions of 100,000x100 (so it produces 3 A splts now). Actually i just realize that technically U and V should be k columns wide, and i am producing k+p. ok, who cares. Distinction between k and p becomes really nominal now.
I'd appreciate if somebody reviews V computation (p.5.5.2 in working notes), just in case, i already forget the derivation of V computation.
Another uncertainty i have is that i am not sure how to best construct tests for U and V outputs, but i am not sure if i care that much since the computation logic is really trivial (compared to everything else). Existing tests verify singlular values against independent memoryonly svd and assert orthogonality of Q output only.
Stuff that is left is really minor and mahoutspecific or engineering:
– figure out how to integrate with mahout CLI
– add minSplitSize parameter to CLI, and others (k,p, computeU, computeV, .. )
– do we want to backport it to apache 0.20? multiple outputs are crippled in 0.20 and i don't know how one can live without multiple outputs in a job like this.
patch m3:
 updated U, V jobs to produce m x k and n x k geometries, respectively. Notes updated to reflect that.
 added minSplitSize setting to enable larger n and (k+p).
 added apache license statements and minor code cleanup.
Final trunk patch for CDH3 or 0.21 api.
This includes code cleanup, javadoc updates, and mahout CLI class (not tested though).
all existing tests and this test are passing. I tested 100Kx100 matrix in local mode only, S values coincide with 1e10 or better.
changes to dependencies i had to make
 hadoop 0.21 or cdh3 to support multiple outputs
 local MR mode has dependency on commonshttp client, so i included it for test scope only in order for test to work
 changed apachemath dependency from 1.2 to 2.1. Actually mahout math module seems to depend on 2.1 too, not clear why it was not transitive for this one.
 commonsmath 1.2 seemed to have depended on commonscli and 2.1 doesn't have it transitively anymore, but one of the classes in core required it. so i added commonscli in order to fix the build.
Ted, sorry i kind of polluted your issue here. Thank you for your encouragement and help. i probably should've opened another issue once it was clear it diverged far enough, instead of keep putting stuff here.
This should be compatible with DistributedRowMatrix. I did not have real distributed test yet as i don't have a suitable data set yet, but perhaps somebody in the user community with the interest in the method could do it faster than i get to it. I will do tests with moderate scale at some point but i don't want to do it on my company's machine cluster yet and i don't exactly own a good one myself.
I did have a rather mixed use of mahout vector math and just dense arrays. Partly becuase i did not quite have enough time to study all capabilities in math module, and partly becuase i wanted explicit access to memory for control over its more efficient reuse in mass iterations. This may or may not need be rectified over time. But it seems to work pretty well as is.
The patch is git patch (so one needs to use patch p1 instead of p0). I know the standard is set to use svn patches... but i already used git for pulling the trunk (so happens i prefer git in general too so i can have my own commit tree and branching for this work).
If there's enough interest from the project to this contribution, i will support it, and if requested, i can port it to 0.20 if that's the target platform for 0.5, as well as doing other specific mahout architectural tweaks. Please kindly let me know.
Thank you.
Ted, sorry i kind of polluted your issue here. Thank you for your encouragement and help. i probably should've opened another issue once it was clear it diverged far enough, instead of keep putting stuff here.
You didn't pollute anything at all. You contributed a bunch of code here.
So far, the only thing that I worry about is the mixture of array and matrices, but even that might be fine.
I was dubious of the scalability of the streaming QR, but if that works then we should be good to go with very little more work.
Do you have any idea if this will work for large sparse matrices as opposed to the 100K x 100 matrix you are using?
Doesn't the streaming QR decomposition require that we look at each row of Y one at a time in a streaming fashion? That is, isn't that a completely sequential algorithm?
yes, it is 100% streaming in terms of A and Y rows. Assumption is that we are ok to load one A row into memory at a time and we optimize for tall matrices (such as billion by million) So we are bound at n but not at m from memory point of view. Even if it is dense, one such vector would take 8MB memory at a time. but sparse sequential vectors should be ok too (it will probably require a little tweak during Y computations to scan it one time sequentially instead of k+p times as i think it is done now with assumption it can be random).
For memory, the concern is random access q blocks which can be no less than k+p by k+p (that is, for the case of k+p=500, it gets to be 2 Mb). But this is all as far as memory is concerned. (well actually 2 times that, plus there's a Y lookahead buffer in order to make sure we can safely form next block. Plus there's a packed R. so for k+p=500 it looks like minimum memory requirement is rougly in the area of 78Mb. which is well below anything).
Y lookahead buffer is not a requirement of algorithm itself, it's MR specific, to make sure we have at least k+p rows to process in the split before we start next block. i thought about it but not much; at 2mb minimum dense memory requirement it did not strike me as a big issue.
CPU may be more of a problem, it is quadratic, as in any QR algorithm known to me. but i am actually not sure if Givens series would produce more crunching than e.g. Householder's . Givens certainly is as numerically stable as householder's and better than GrammSchmidt. In my tests for 100k tall matrix the orthonormality residuals seem to hold at about no less than 10e13 and surprisingly i did not notice any degradataion at all compared to smaller sizes. Actually I happened to read aobut LAPack methods ithat prefer Givens for possiblity of reordering and thus easier parallelization).
Anyway, speaking of numerical stability, whatever degradation occurs, i think it would be dwarfed by stochastic inaccuracy which grows quite significantly in my low rank tests. Perhaps for kp=500 it should degrade much less than for 2030.
But i guess we need to test at scale to see the limitations.
There is a catch though. With this implementation, m is memory bound. It is not in mapper though but in combiners and mapper of the next step.
But my reasoning was, with 1G memory and k+p =500 it seems to exist rather wide spectrum of admissible combinations of r (block height) and minSplitSize (governing essentually # of mappers needed) that would cover billion rows, and the sweet spot of this combination seem to exceed 1 bln rows.
In addition, there are 2 remedies to consider. First is rather straightforward application of compression on R sequence.
Second remedy results from the fact that our QR merge process is hierarchical. Right now it's twolevel hierarchy. I.e if processes at each stage merge 1000 q blocks, then at the next level we can merge another 1000 q blocks, so total number of q blocks is thus approx. 1 mln. (for 1G memory the number of some 600800k blocks is more plausible). Assuming Q blocks are k+p rows high, that gives us aprroximately 300 mln rows for m. But the trick is that if we have 1G memory in the mapper, then Q blocks don't have to be 500 rows high, then can easily be 200k rows high. Which immediately puts us in the range, conservatively, of 10 bln rows or so without any additional remedies, which i think is about the scale of Google's document index in 2005, if we wanted to do an LSI on it, assuming there's 1 mln lemmas in English, which there's not.
But if we add another maponly pass over blocked Q data, then we can have 1 bln blocks with all the considerations above and that should put us in the range of 30 trillion rows of A. This number grows exponentially with every added MR pass which is why i am saying m is virtually unbound.
Adding these remedies seems to be pretty straighforward, but for a first stab at the problem my estimates for m bound seem to be adequate. With these kind of numbers, this may easily become a technology in a search of a problem. We may add some analysis on optimality of combination of block size and minSplitSize. My initial thought is that finding maximum here is pretty straigtoward, seems to be a task of finding maximum on a second degree polynomial function.
It's more likely that much more memory would go into precision effort rather than maximizing m bound though. E.g. instead of covering the scale, the resources may go into increasing precision and oversampling. In which case additional maponly passes over Q will be tremendously useful. (imagine this could do k+p=5000 with just one additional maponly pass over Q data) If this is the case, then the next low hanging fruit step is to add maponly hierarchical merge of Rs on Q blocks.
However, it's a stochastic algorithm and as such it is probably not good enough for processes that would require such precision (e.g certainly not to do math work on rocket boosters, i think). That said, k+p=5000 probably doesn't make sense. I think applications sharply divide into 2 categories, where precision requirements are either much higher than that, or much lower. I can't think of much in between.
Apologies for multiple editions, mostly typos and correction to numbers. These numbers are still pretty offhand and exercise in mental arithmetics, actual mileage probably will vary as much as + 40% because of unaccounted overhead. I tried to be conservative though.
Doesn't the streaming QR decomposition require that we look at each row of Y one at a time in a streaming fashion? That is, isn't that a completely sequential algorithm?
Even if it is dense, one such vector would take 8MB memory at a time. but sparse sequential vectors should be ok too (it will probably require a little tweak during Y computations to scan it one time sequentially instead of k+p times as i think it is done now with assumption it can be random).
Oh. I guess you hinted at possibilty that if we use sparse sequential vector for A rows, then we memoryunbound for n! so who cares about m then! And then we can have billion by billion even with this implemetnation. Wow. That's an extremely powerful suggestion. But that's definitely requires code review and performance tests. And we go over A only one time so there's no need to revisit the sparse vectors. I'll take a look at it to see if i can engineer a solution. If it is possible at all, it should be extremely simple.
Although can one really implement a streaming read of a value for a mapper value? I am not sure. i guess i need to look at implementation of sparse sequential vector. It would seem to me though that sequenceFile.next() requires deserialization of the whole record, doesn't it? so reading mapper value in a streaming fashion should not be possible, not without some sort of a hack, right?
Also, if we consider wide matrices instead of tall matrices, then maximum number of mappers might be reduced which would affect parallelism on big clusters.
Another consideration for extremely wide matrices i guess is that the A block in this case will certainly overshoot several standard DFS blocks so it may only be efficient if we force data collocation for a big number of blocks (or just increase number of blocks). I am not sure if hadoop quite there yet to tweak that on individual file basis.
d
Although can one really implement a streaming read of a value for a mapper value?
Of course we can split a vector into several slices shoved into sequential records. That would require some work to tweak SequenceFile's record reader logic so it doesn't stop in the middle of a vector (and respectively skips records to the next vector at the beginning of a split) but such possibility definitely exists.
Not sure if Mahout has already something like that, need to look closer. But it should be possible to develop something like DistributedSlicedRowMatrix.
small update.
 added special treatment for SequentialAccessSparseVector computations during dot product computation like it is done everywhere else.
I guess it is about all i can do at this point for n scaling efficiently.
We could scale n even further by splitting the vector into slices as said before, but not before we solve the problem of codedata collocation in 'supersplits' for wide matrices. If we don't do that, it will cause a lot of IO in mappers and kind of defeats the purpose of MR imo.
We could scale n even further by splitting the vector into slices as said before, but not before we solve the problem of codedata collocation in 'supersplits' for wide matrices. If we don't do that, it will cause a lot of IO in mappers and kind of defeats the purpose of MR imo.
I think that my suggested approach handles this already.
The block decomposition of Q via the blockwise QR decomposition implies a breakdown of B into columnwise blocks which can each be handled separately. The results are then combined using concatenation.
I think that my suggested approach handles this already.
The block decomposition of Q via the blockwise QR decomposition implies a breakdown of B into columnwise blocks which can each be handled separately. The results are then combined using concatenation.
Ted, yes, i understand that part, but i think we are talking about different things. What I am talking about is formation of Y rows well before orthonotmalization is even concerned.
What i mean is that right now VectorWritable loads the entire thing into memory. Hence, the bound for width of A. (i.e. we can't load A row that is longer than some memory chunk we can afford for it).
However, what A row is participating in in each case is a bunch of (namely, k+p) dotproducts. In order to produce those, it is sufficient to examine A row sequentialy (i.e. streamingly) in one pass while keeping only k+p values in memory as dotaccumulators.
Hence, say if we equipped VectorWritable with a pushparser like element handler (notorious DocumentHanlder immediately pops in memory form SAXParser) then we will never have to examine more than one element of A row at a time. And hence we are not bound by memory for n (A width) anymore. That handler would form Y rows directly during the sequential examination of A rows. Identical considerations are in effect when forming Qt*A partial products (i already checked for this).
I already thought about this approach a little (and i believe Jake Mannix also posted something very similar to that recently to effect of sequential vector examination backed by a streaming read).
Since it is touching VectorWritable internals, i think i would need to make a change proposal for it and if seems reasonable handle it in another issue. I will do so but i need to check couple of things first in Hadoop see if it is feasible within current MR framework and doesn't blow off all benefits codedata collocation.
If that proposal is implemented, and MR considerations are tackled, we will have SSVD that scales to about a billion rows for 500 singular vlaues and 1G memory in mapper vertically (m) and a gazillion in n (width).
theoretically. How about that.
> I think that my suggested approach handles this already.
> The block decomposition of Q via the blockwise QR decomposition implies a breakdown of B into columnwise blocks which
> can each be handled separately. The results are then combined using concatenation.Ted, yes, i understand that part, but i think we are talking about different things. What I am talking about is formation of Y rows well before orthonotmalization is even concerned.
What i mean is that right now VectorWritable loads the entire thing into memory. Hence, the bound for width of A. (i.e. we can't load A row that is longer than some memory chunk we can afford for it).
I understand that now.
The current limitation is that the sparse representation of a row has to fit into memory. That means that we are limited to cases with a few hundred million nonzero elements and are effectively unlimited on the number of potential columns of A.
The only other place that the total number of elements in a row comes into play is in B. Using the block form of Q, however, we never
have to store an entire row of B, just manageable chunks.
My real worry with your approach is that the average number of elements per row of A is likely to be comparable to p+k. This means that Y = A \Omega will be about as large as A. Processing that sequentially is a nonstarter and the computation of Q without block QR means that Y is processed sequentially. On the other hand, if we block decompose Y, we want blocks that fit into memory because that block size lives on in B and all subsequent steps. Thus, streaming QR is a nonissue in a blocked implementation. The blocked implementation gives a natural parallel implementation.
My real worry with your approach is that the average number of elements per row of A is likely to be comparable to p+k. This means that Y = A \Omega will be about as large as A. Processing that sequentially is a nonstarter and the computation of Q without block QR means that Y is processed sequentially. On the other hand, if we block decompose Y, we want blocks that fit into memory because that block size lives on in B and all subsequent steps. Thus, streaming QR is a nonissue in a blocked implementation. The blocked implementation gives a natural parallel implementation.
I think you misunderstanding it a little. the actual implementation is not that naive. let me clarify.
First, there is blocking. More over, it's a hierarchical blocking.
the way it works, you specify block height, which is more k+p but ideally less than a MR split would host (you can specify more but you may be producing some network traffic then to move noncollocated parts of the split). Blocks are considered completely in parallel. Hence, initial parallelizm degree is m/r where r is average block height. They can (and are) considered independently, among the splits. "thin streaming QR" runs inside the blocks, not on the whole Y.
Secondly, Y matrix, or even its blocks, are never formed. What is formed is shifting intermediate Q buffer of the size (k+p)xr and intermediate upper triangular R of size (k+p)x(k+p). Since they are triangular, there's a rudimental implementation of Matrix itnerface called UpperTriangular not to waste space on lower triangle but still allow random access.
Thirdly, the hierarchy. when we form Q blocks, we will have to update them with Givens operations resulting from merging R matrices. This is done in combiner and this comes very natural to it. If there's say z blocks in a mapper then Q1 goes thru updates resulting from z merges of R, Q2 goes thru udpates resulting from z1 merges and so on. Nothing being concatenated (or unblocked) there except for the R sequence (but it is still sequence, that is sequentially accessed thing) which i already provided memory estimates for. Most importantly, it does not depend on the block height, so you can shrink R sequence length if you have higher Q blocks, but higher Q blocks also take more memory to process at a time. there's a sweet spot to be hit here with parameters defining block height and split size, so it maximizes the thruput. for k+p=500 i don't see any memory concerns there in a single combiner run.
And there's no reducer (i.e. any sizable shuffle and sort) here. At the end of this operation we have a bunch of Rs which corresponds to the number of splits, and a bunch of interbediate Q blocks still same size which correspond to number of Qblocks.
Now we can repeat this process hierarchically with additional maponly passes over Q blocks until only one R block is left. with 1G memory, as i said, my estimate is we can merge up to 1000 Rs per combiner with one MR pass (without extra overhead for single Q block and other java things). (in reality in this implementation there are 2 levels in this hierarchy which seems to point to over 1 bln rows, or about 1 mln Q blocks of some relatively moderate height r>>k+p, but like i said with just one maponly pass one can increase scale of m to single trillions ). This hierarchical merging is exactly what i meant by 'making MR work harder' for us.
There is a poor illustration of this hierarchical process in the doc that makes it perhaps more clear than words.
Also let me point out that the fact that the processes involved in R merging are maponly, which means that if we play the splitting game right in MR, there would practically be no networking IO per MR theory. This is very important imo for such scale. The only IO that occurs is to 'slurp' r sequences from HDFS before next stage of hierarchical Rmerge. For a sequence of 1000 R, k+p 500, the size of R, dense and uncompressed, is approximately 1 mb each, so for a sequence of thousand Rs, the size of such slurp IO, dense and uncompressed, would be 1G, which is less than what i am having today in a single step with Pig for a 200k of protopacked log records today in production and that finishes in a minute.
Bottom line, let's benchmark it. So we don't have to guess. Especially if we can do A vector streaming. I am personally having trouble with logistics of this so far, as i mentioned before. I will get to benchmarking it sooner or later. Important thing for me at this point was to make sure it does correct computation (which it does) and make educated guess about the scale (which is billion by million without vector streaming support or billion to gazillion with vector streaming support, with potential to extend m scale thousand times with each additional maponly pass over Q data (which is (k+p)xm which is again unbounded for n).
thanks.
I think you misunderstanding it a little. the actual implementation is not that naive. let me clarify.
I was hoping I misunderstood it.
And there's no reducer (i.e. any sizable shuffle and sort) here. At the end of this operation we have a bunch of Rs which corresponds to the number of splits, and a bunch of interbediate Q blocks still same size which correspond to number of Qblocks.
Now we can repeat this process hierarchically with additional maponly passes over Q blocks until only one R block is left. with 1G memory, as i said, my estimate is we can merge up to 1000 Rs per combiner with one MR pass (without extra overhead for single Q block and other java things). (in reality in this implementation there are 2 levels in this hierarchy which seems to point to over 1 bln rows, or about 1 mln Q blocks of some relatively moderate height r>>k+p, but like i said with just one maponly pass one can increase scale of m to single trillions ). This hierarchical merging is exactly what i meant by 'making MR work harder' for us.
Sounds to me like the reducer could replicate the combiner and thus implement the second step of your hierarchy which would avoid the second MR pass. You could have a single reducer which receives all combiner outputs and thus merge everything. Since you can't guarantee that the combiner does any work, this is best practice anyway. The specification is that the combiner will run zero or more times.
This also raises the question of whether your combiner can be applied multiple times. I suspect yes. You will know better than I.
Sounds to me like the reducer could replicate the combiner and thus implement the second step of your hierarchy which would avoid the second MR pass. You could have a single reducer which receives all combiner outputs and thus merge everything.
First of all, second level hierarchical merge is done in Bt job mappers – so moving this code into reducer wouldn't actually really win anything in terms of actual IO or # of MR passes.
Secondly, I don't beleive single reducer pass is efficient. All parallelization is gone, isn't it? But the problem is not even that but limitation on how many Rs you can preload for the merges into same process. Rs are preloaded as a side info (and we relinquish one R with every Q we process in combiner, so initially it loads all but then throws them away one by one. So we can't merge them all in one pass, but we can divide them into subsequences. If they merged by subsequences, subsequences must be completed and order is important and should be exactly the same for all Q blocks. The catch is that each Q update has to complete the merges of the reminder of the R sequence. Such subsequence merge is described in computeQHatSequence algorithm in my notes.
Finally, there's no need to sort. When we do R merge, as it is shown in the notes, we have to revisit Q blocks (and Rs) exactly in the same order we just produced them in. Problem is that when we revisit a Q block, we have to have the tail of all subsequent R blocks handy. So even if we sent them to reducers (which was my initial plan), we'd have to sort them by task id they came from and then by their order inside the task that produced them. And then we might need to duplicate R traffic to every reducer process. Huge waste, again.
Since you can't guarantee that the combiner does any work, this is best practice anyway. The specification is that the combiner will run zero or more times.
That actually is absolutely valid and i was very concerned about it in the beginning. I expected it but it did not turn up in tests so far, so that issue was slowly glimmering in my subconsciousness ever since.
If that actually is a problem then yes I might be forced to extend Q pass to include reducer. Which IMO would be a humongous exercise in IO bandwidth waste. there's no need for that. there's only need a for local merge of R sequences and doing second pass over q block sequence you just generated, in the same order you generated it. Even a combiner is an overshoot for that, if i were implementing it in a regular parallel batching way, since there's no need to reorder q blocks.
I also considered that If i am forced to address that i can still do it in the mapper only without even a combiner by pushing Q blocks to local FS side files and doing a second pass over them and that'd be much more efficient than pushing them to another networked process. Not mentioning all the unneeded sorting involved in between and all the armtwisting i was struggling with involved in sending them in same task chunks, same order. There's still rudimentary remainder of that attempt in the code (the key that carries task id and order id for Q block that are never subsequently used anywhere). But saving local side file in mapper and do second pass over it is still way more attractive than reducer approach if i need to fix this.
This also raises the question of whether your combiner can be applied multiple times. I suspect yes. You will know better than I.
Yes, that's the optional hierarchy increment that i was talking about, that we can additionally implement if billion for m and unbound n is not good enough.
Actually i am seriously considering reworking combiner approach into twopassinthemapper approach.
This should be ok because side file is going to be n/(k+p) times smaller than original split. Most likely IO cache will not even let us wait on io in some cases. but if there is disk io, it would be 100% sequential speed.
And that should be even more efficient than asking combiner to sort what is already sorted (which i mostly did out of aesthetics as combiner looks much more fanciful than some explicit sequence file read/writes)
That means that we are limited to cases with a few hundred million nonzero elements and are effectively unlimited on the number of potential columns of A.
I beleive in case of SSVD this statement only partially valid.
It all depends on what you are spec'd to. say we are spec'd to 1G + java/mr overhead, and few hundred million nonzero elements will take few hundred megabytes multiplied by 8. Which is already all or more than all i have. In my spec, (million nonzeros) it's only 8 mb that's seems ok. SSVD assumption doesn't include any significant memory allocation for rows of A, and most importantly, it doesn't have to, i think. Philosophy here is that A is a file, and read it with a buffer to optimize I/O, but my stream buffer doesn't have to be forced to be 1G on me.
> This also raises the question of whether your combiner can be applied multiple times. I suspect yes. You will know better than I.
Yes, that's the optional hierarchy increment that i was talking about, that we can additionally implement if billion for m and unbound n is not good enough.
I think I wrote poorly here.
You don't get a choice. The frame will run the combiner as many times as it feels it wants to. It will run in the mapper or reducer or both or neither.
Your combiner has to be ready to run any number of times on the output of the mapper or the output of other combiners or a combination of the same. This isn't optional in Hadoop. It may not have happened in the test runs you have done, but that doesn't imply anything about whether it will happen at another time.
You don't get a choice. The frame will run the combiner as many times as it feels it wants to. It will run in the mapper or reducer or both or neither.
Ted, thank you for pointing it. As i mentioned couple of comments above, this is very well understood, was a concern from the beginning, never came up as a problem in tests, but I will do a small patch per my comment above that will make processing complexity even faster albeit code might become a little uglier. I am very well aware mapper can spill some records past combiner and send them to sort in the reducer per spec. I
In fact, i had version that does just that on another branch. i just need to yank it and bring in sync with this branch.
Thank you.
Patch update
 eliminates use of combiners. Combiner's code moved back to mapper of the same task. This makes first stage of hierarchical R merges more efficient and consistent with MR spec.
Retested with 100k rows matrix.I will be updating working notes to reflect the change shortly.
Actually i think the biggest issue here is not scale for memory but what i call 'supersplits'.
if we have a rowwise matrix format, and by virtue of SSVD algorithm we have to consider no less than 500x500 blocks, then even with terasort 40tb 2000 node cluster block size parameter setting (128Mb) we are constrained to approx. ~3050k densely wide matrices (even then the expectation is that half of the mapper's data would have to be downloaded from some other node). Which kind of defeats one of the main pitches of MR, codedata collocation. so in case of 1 mln densely wide matrices, and big cluster, we'd be downloading like 99% of data from somewhere else. But we already paid in IO bandwidth when we created input matrix file in the first place, so why should it be a giant inefficient model of a supercomputer in a cloud? Custom batching approach would be way more efficient.
I kind of dubbed the problem above as a 'supersplits problem' in my head.

I beleive i am largely done with this mahout issue as far as method and code are concerned. We, of course, need to test it on something sizable. Benchmarks thus are a pending matter, and i expect they will be net iobound but they would be reasonably scaled for memory per discussion (less the issue of deficient prebuffering in VectorWritable on wide matrices) but additional remedies are clear if needed. There might be some minor tweaks for outputting U,V required. Maybe add one or two more maponly passes over Q data to get additional scale for m. Maybe backport for hadoop 0.20 if mahout decides to release this code.
Next problem i am going to ponder as a side project is devising an SSVD MR method on a blockwise serialized matrices. I think i can devise an SSVD method that can efficiently address "supersplits" problem (with more shuffle and sort I/O though but it would be much more mrlike). Since I think Mahout supports neither block wise formatted matrices, nor, respectively, any BLAS ops for such inputs, an alternative approach to matrix (de)serialization would have to be created. Conceivable scenario would be to reprocess mahout's rowwise matrices into such SSVD blockwise input at additional expense, but singlepurposed data perhaps may well just vectorise blockwise directly.
update to working notes
 changes in latest patch brought to sync with the doc (QJob mapper, no combiner)
 added issues section including my thoughts on limitations of this approach and possible attack angles to alleviate them.
Patch update.
 finalized mahout CLI integration & tested.
Tested on 10k x 10k dense matrix in distributed hadoop mode (compressed source sequence file 743mb) on my 3 yo dual core. It is indeed, as i expected, quite cpubound but good news is that it is so well parallelizable with most load on maponly jobs that it should be no problem to redistribute and front end doesn't require any or cpu capacity at all. Square symmetric matrix of 200x200 sizes computes instantaneously.
The command line i used was:
bin/mahout ssvd i /mahout/ssvdtest/A o /mahout/ssvdout/1 k 100 p 100 r 200 t 2
I also was testing this with CDH3b3 setup.
Sorry for iterating too often, but this was small but important fix for a showstopper.
 added orthonormality assertions in local tests for V, U (they pass with epsilon 1e10 or better).
 small fixes to U, V jobs.
Now should be suitable for LSI type of work with documents having 10k30k lemmas average.
BTW when inserting dependencies for CDH3b3, additional jackson jar is required in order for local hadoop test to work.
in CDH3b2 no such change is required.
Not sure about 0.21, but proper care should be taken as usual to integrate hadoop client's transitive dependencies into mahout dependencies.
Working notes update:
 added more or less formal description of hierarchical QR technique.
a code drop from the head of my stable branch. Various number of computational improvements. Cloudera repo integration for CDH dependency. Automatic detection of the label writable type in the input and propagating that to the U output. This is not the most efficient code i have but that's the only one that contains 0 Mahout code hacks.
 couple commits for U, V jobs went missing. restored.
 added couple options for computing U*pow(Sigma,0.5), V*pow(Sigma,0.5) outputs instead of U,V (makes "user""item" similarity computations easier)
Apparently, i lost several commits when setting code with github. Some CLI patches were lost. So, repared.
just ran thru Reuters dataset example with no problems in hadoop mode on a single noder(with another performance branch, sibling of this one). The longest job seems to be B*Bt job at the moment, will have to look at it to see if there's a space for efficiency improvement.
But the branch with VectorWritable preprocessing is especially usable as it saved on GC iterations a lot and runs much faster
 Optimized B x B' job . Now BBt step runs in 38 seconds in distributed mode on a single node (including the job setup which is about 20 seconds) (Reuters dataset with bigrams).
 fixes R computation bug introduced in recent optimizations
 Verified on Reuters dataset with bigrams. Ran in almost exactly 10 minutes on a single node per command line report (300 singular values) with both U and V jobs. V computation is now the longest (which is expected as likely # of stems >> # of documents). Not sure if there's much space for improvement there, but i will take a look.
 Verified with multiple mappers, multiple q blocks again with full rank decomposition to confirm singular value matches to Colt stock solver's and orthogonality of U and V.
This is essentially 'patch available' except it requires 0.21 or CDH3 to compile (new API which is incomplete in 0.20). Hence i will not be changing the status here – i guess we can use this when Mahout switches to new api.
I will open another issue for backport of this to 0.20.
PS. I guess i did click on 'patch available', didn't I? I did not mean to – this patch changes the whole dependency thing for mahout. please roll back the status. My apologies.
PS. I guess i did click on 'patch available', didn't I? I did not mean to – this patch changes the whole dependency thing for mahout. please roll back the status. My apologies.
Don't worry about it. Until it is committed, it isn't real. And as soon as somebody applies the patch and compiles, they will see failures and thus shouldn't commit it.
Actually, the last time i checked, and unless it got out of sync since then, it would compile and although there are couple of tests that would not pass with CDH3b3 (but all would pass with CDH3b2). It's just it grabs CDH3 from cloudera's repo. So integration attempt might be dangerous.
It's just it grabs CDH3 from cloudera's repo.
Meaning the pom references CDH?
yes. current patch references CDH. It's either 0.21 or CDH but since we use CDH in production, i retained CDH reference. We will have to change it to a concrete 0.21 reference. My best hope is that there's going to be another release of 0.21 or later which Hadoop group would more or less assert as a stable, at which point Mahout could switch dependencies. It looks like it will indeed require some revamping as there's a number of tests that is currently failing with that switch. Unless it is CDH2b2.
Integrated in MahoutQuality #658 (See https://hudson.apache.org/hudson/job/MahoutQuality/658/)
MAHOUT593: initial addition of Stochastic SVD method (related issue is MAHOUT376)
Am I correct that this is, for our purposes, subsumed into MAHOUT593? It's the same patch, just ported to work with Mahout now. I presume that going forward we'll want to iterate on that version rather than entertain this version any more.
Reopen if I'm wrong.
That's fine. We'll have to go back to a slight permutation of this patch with 0.21 api, but it's fine. I'll recreate it when it's time.
Algorithm details.