Details
Description
While combining puts of same row in map phase we are using below logic in PutCombiner#reduce. In for loop first time we will add one Put object to puts map. Next time onwards we are just overriding key values of a family with key values of the same family in other put. So we are mostly writing one Put object to map output and remaining will be skipped(data loss).
Map<byte[], Put> puts = new TreeMap<byte[], Put>(Bytes.BYTES_COMPARATOR); for (Put p : vals) { cnt++; if (!puts.containsKey(p.getRow())) { puts.put(p.getRow(), p); } else { puts.get(p.getRow()).getFamilyMap().putAll(p.getFamilyMap()); } }
We need to change logic similar as below because we are sure the rowkey of all the puts will be same.
Put finalPut = null; Map<byte[], List<? extends Cell>> familyMap = null; for (Put p : vals) { cnt++; if (finalPut==null) { finalPut = p; familyMap = finalPut.getFamilyMap(); } else { for (Entry<byte[], List<? extends Cell>> entry : p.getFamilyMap().entrySet()) { List<? extends Cell> list = familyMap.get(entry.getKey()); if (list == null) { familyMap.put(entry.getKey(), entry.getValue()); } else { (((List<KeyValue>)list)).addAll((List<KeyValue>)entry.getValue()); } } } } context.write(row, finalPut);
Also need to implement TODOs mentioned by Nick
// TODO: would be better if we knew <code>K row</code> and Put rowkey were // identical. Then this whole Put buffering business goes away. // TODO: Could use HeapSize to create an upper bound on the memory size of // the puts map and flush some portion of the content while looping. This // flush could result in multiple Puts for a single rowkey. That is // acceptable because Combiner is run as an optimization and it's not // critical that all Puts are grouped perfectly.
Attachments
Attachments
Issue Links
- relates to
-
HBASE-11994 PutCombiner floods the M/R log with repeated log messages.
- Closed