diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala index 2d21e69..016db00 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.spark import java.net.InetSocketAddress import java.util +import java.util.UUID import javax.management.openmbean.KeyAlreadyExistsException import org.apache.hadoop.hbase.fs.HFileSystem @@ -675,7 +676,7 @@ class HBaseContext(@transient sc: SparkContext, //This will only roll if we have at least one column family file that is //bigger then maxSize and we have finished a given row key if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) { - rollWriters(writerMap, + rollWriters(fs, writerMap, regionSplitPartitioner, previousRow, compactionExclude) @@ -685,7 +686,7 @@ class HBaseContext(@transient sc: SparkContext, previousRow = keyFamilyQualifier.rowKey } //We have finished all the data so lets close up the writers - rollWriters(writerMap, + rollWriters(fs, writerMap, regionSplitPartitioner, previousRow, compactionExclude) @@ -825,7 +826,7 @@ class HBaseContext(@transient sc: SparkContext, //This will only roll if we have at least one column family file that is //bigger then maxSize and we have finished a given row key if (rollOverRequested) { - rollWriters(writerMap, + rollWriters(fs, writerMap, regionSplitPartitioner, previousRow, compactionExclude) @@ -839,7 +840,7 @@ class HBaseContext(@transient sc: SparkContext, //If there is no writer for a given column family then //it will get created here. //We have finished all the data so lets close up the writers - rollWriters(writerMap, + rollWriters(fs, writerMap, regionSplitPartitioner, previousRow, compactionExclude) @@ -884,17 +885,15 @@ class HBaseContext(@transient sc: SparkContext, valueOf(familyOptions.dataBlockEncoding)) val hFileContext = contextBuilder.build() - if (null == favoredNodes) { - new WriterLength(0, new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs) - .withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType)) - .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build()) - } else { - new WriterLength(0, - new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) - .withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType)) - .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext) - .withFavoredNodes(favoredNodes).build()) - } + //Add a '_' to the file name because this is a unfinished file. A rename will happen + // to remove the '_' when the file is closed. + new WriterLength(0, + new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) + .withBloomType(BloomType.valueOf(familyOptions.bloomType)) + .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext) + .withFilePath(new Path(familydir, "_" + UUID.randomUUID.toString.replaceAll("-", ""))) + .withFavoredNodes(favoredNodes).build()) + } /** @@ -1008,13 +1007,15 @@ class HBaseContext(@transient sc: SparkContext, /** * This will roll all Writers + * @param fs Hadoop FileSystem object * @param writerMap HashMap that contains all the writers * @param regionSplitPartitioner The partitioner with knowledge of how the * Region's are split by row key * @param previousRow The last row to fill the HFile ending range metadata * @param compactionExclude The exclude compaction metadata flag for the HFile */ - private def rollWriters(writerMap:mutable.HashMap[ByteArrayWrapper, WriterLength], + private def rollWriters(fs:FileSystem, + writerMap:mutable.HashMap[ByteArrayWrapper, WriterLength], regionSplitPartitioner: BulkLoadPartitioner, previousRow: Array[Byte], compactionExclude: Boolean): Unit = { @@ -1022,7 +1023,7 @@ class HBaseContext(@transient sc: SparkContext, if (wl.writer != null) { logDebug("Writer=" + wl.writer.getPath + (if (wl.written == 0) "" else ", wrote=" + wl.written)) - closeHFileWriter(wl.writer, + closeHFileWriter(fs, wl.writer, regionSplitPartitioner, previousRow, compactionExclude) @@ -1034,16 +1035,18 @@ class HBaseContext(@transient sc: SparkContext, /** * Function to close an HFile + * @param fs Hadoop FileSystem object * @param w HFile Writer * @param regionSplitPartitioner The partitioner with knowledge of how the * Region's are split by row key * @param previousRow The last row to fill the HFile ending range metadata * @param compactionExclude The exclude compaction metadata flag for the HFile */ - private def closeHFileWriter(w: StoreFile.Writer, - regionSplitPartitioner: BulkLoadPartitioner, - previousRow: Array[Byte], - compactionExclude: Boolean): Unit = { + private def closeHFileWriter(fs:FileSystem, + w: StoreFile.Writer, + regionSplitPartitioner: BulkLoadPartitioner, + previousRow: Array[Byte], + compactionExclude: Boolean): Unit = { if (w != null) { w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis())) @@ -1055,6 +1058,18 @@ class HBaseContext(@transient sc: SparkContext, Bytes.toBytes(compactionExclude)) w.appendTrackedTimestampsToMetadata() w.close() + + val srcPath = w.getPath + + //In the new path you will see that we are using substring. This is to + // remove the '_' character in front of the HFile name. '_' is a character + // that will tell HBase that this file shouldn't be included in the bulk load + // This feature is to protect for unfinished HFiles being submitted to HBase + val newPath = new Path(w.getPath.getParent, w.getPath.getName.substring(1)) + if (!fs.rename(srcPath, newPath)) { + throw new IOException("Unable to rename '" + srcPath + + "' to " + newPath) + } } }