diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala index 2d21e69..e894c4b 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.spark import java.net.InetSocketAddress import java.util +import java.util.UUID import javax.management.openmbean.KeyAlreadyExistsException import org.apache.hadoop.hbase.fs.HFileSystem @@ -675,7 +676,7 @@ class HBaseContext(@transient sc: SparkContext, //This will only roll if we have at least one column family file that is //bigger then maxSize and we have finished a given row key if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) { - rollWriters(writerMap, + rollWriters(fs, writerMap, regionSplitPartitioner, previousRow, compactionExclude) @@ -685,7 +686,7 @@ class HBaseContext(@transient sc: SparkContext, previousRow = keyFamilyQualifier.rowKey } //We have finished all the data so lets close up the writers - rollWriters(writerMap, + rollWriters(fs, writerMap, regionSplitPartitioner, previousRow, compactionExclude) @@ -825,7 +826,7 @@ class HBaseContext(@transient sc: SparkContext, //This will only roll if we have at least one column family file that is //bigger then maxSize and we have finished a given row key if (rollOverRequested) { - rollWriters(writerMap, + rollWriters(fs, writerMap, regionSplitPartitioner, previousRow, compactionExclude) @@ -839,7 +840,7 @@ class HBaseContext(@transient sc: SparkContext, //If there is no writer for a given column family then //it will get created here. //We have finished all the data so lets close up the writers - rollWriters(writerMap, + rollWriters(fs, writerMap, regionSplitPartitioner, previousRow, compactionExclude) @@ -886,13 +887,16 @@ class HBaseContext(@transient sc: SparkContext, if (null == favoredNodes) { new WriterLength(0, new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs) - .withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType)) - .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build()) + .withBloomType(BloomType.valueOf(familyOptions.bloomType)) + .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext) + .withFilePath(new Path(familydir, "_" + UUID.randomUUID.toString.replaceAll("-", ""))) + .build()) } else { new WriterLength(0, new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) - .withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType)) + .withBloomType(BloomType.valueOf(familyOptions.bloomType)) .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext) + .withFilePath(new Path(familydir, "_" + UUID.randomUUID.toString.replaceAll("-", ""))) .withFavoredNodes(favoredNodes).build()) } } @@ -1008,13 +1012,15 @@ class HBaseContext(@transient sc: SparkContext, /** * This will roll all Writers + * @param fs Hadoop FileSystem object * @param writerMap HashMap that contains all the writers * @param regionSplitPartitioner The partitioner with knowledge of how the * Region's are split by row key * @param previousRow The last row to fill the HFile ending range metadata * @param compactionExclude The exclude compaction metadata flag for the HFile */ - private def rollWriters(writerMap:mutable.HashMap[ByteArrayWrapper, WriterLength], + private def rollWriters(fs:FileSystem, + writerMap:mutable.HashMap[ByteArrayWrapper, WriterLength], regionSplitPartitioner: BulkLoadPartitioner, previousRow: Array[Byte], compactionExclude: Boolean): Unit = { @@ -1022,7 +1028,7 @@ class HBaseContext(@transient sc: SparkContext, if (wl.writer != null) { logDebug("Writer=" + wl.writer.getPath + (if (wl.written == 0) "" else ", wrote=" + wl.written)) - closeHFileWriter(wl.writer, + closeHFileWriter(fs, wl.writer, regionSplitPartitioner, previousRow, compactionExclude) @@ -1034,16 +1040,18 @@ class HBaseContext(@transient sc: SparkContext, /** * Function to close an HFile + * @param fs Hadoop FileSystem object * @param w HFile Writer * @param regionSplitPartitioner The partitioner with knowledge of how the * Region's are split by row key * @param previousRow The last row to fill the HFile ending range metadata * @param compactionExclude The exclude compaction metadata flag for the HFile */ - private def closeHFileWriter(w: StoreFile.Writer, - regionSplitPartitioner: BulkLoadPartitioner, - previousRow: Array[Byte], - compactionExclude: Boolean): Unit = { + private def closeHFileWriter(fs:FileSystem, + w: StoreFile.Writer, + regionSplitPartitioner: BulkLoadPartitioner, + previousRow: Array[Byte], + compactionExclude: Boolean): Unit = { if (w != null) { w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis())) @@ -1055,6 +1063,13 @@ class HBaseContext(@transient sc: SparkContext, Bytes.toBytes(compactionExclude)) w.appendTrackedTimestampsToMetadata() w.close() + + val srcPath = w.getPath + val newPath = new Path(w.getPath.getParent, w.getPath.getName.substring(1)) + if (!fs.rename(srcPath, newPath)) { + throw new RuntimeException("Unable to rename '" + srcPath + + "' to " + newPath) + } } }