Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 13256) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -3194,7 +3194,93 @@ closeBulkRegionOperation(); } } + + /** + * Attempts to atomically load a group of hfiles. This is critical for loading + * rows with multiple column families atomically. + * + * @param familyPaths List of Pair + * @return true if successful, false if failed recoverably + * @throws IOException if failed unrecoverably. + */ + public boolean bulkLoadHFilesRefresh(List> familyPaths) + throws IOException { + Preconditions.checkNotNull(familyPaths); + // we need writeLock for multi-family bulk load + startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths)); + try { + this.writeRequestsCount.increment(); + // There possibly was a split that happend between when the split keys + // were gathered and before the HReiogn's write lock was taken. We need + // to validate the HFile region before attempting to bulk load all of them + List ioes = new ArrayList(); + List> failures = new ArrayList>(); + for (Pair p : familyPaths) { + byte[] familyName = p.getFirst(); + String path = p.getSecond(); + + Store store = getStore(familyName); + if (store == null) { + IOException ioe = new DoNotRetryIOException( + "No such column family " + Bytes.toStringBinary(familyName)); + ioes.add(ioe); + failures.add(p); + } else { + try { + store.assertBulkLoadHFileOk(new Path(path)); + } catch (WrongRegionException wre) { + // recoverable (file doesn't fit in region) + failures.add(p); + } catch (IOException ioe) { + // unrecoverable (hdfs problem) + ioes.add(ioe); + } + } + } + + // validation failed, bail out before doing anything permanent. + if (failures.size() != 0) { + StringBuilder list = new StringBuilder(); + for (Pair p : failures) { + list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ") + .append(p.getSecond()); + } + // problem when validating + LOG.warn("There was a recoverable bulk load failure likely due to a" + + " split. These (family, HFile) pairs were not loaded: " + list); + return false; + } + + // validation failed because of some sort of IO problem. + if (ioes.size() != 0) { + LOG.error("There were IO errors when checking if bulk load is ok. " + + "throwing exception!"); + throw MultipleIOException.createIOException(ioes); + } + + for (Pair p : familyPaths) { + byte[] familyName = p.getFirst(); + String path = p.getSecond(); + Store store = getStore(familyName); + try { + store.bulkLoadHFileRefresh(path); + } catch (IOException ioe) { + // a failure here causes an atomicity violation that we currently + // cannot recover from since it is likely a failed hdfs operation. + + // TODO Need a better story for reverting partial failures due to HDFS. + LOG.error("There was a partial failure due to IO when attempting to" + + " load " + Bytes.toString(p.getFirst()) + " : "+ p.getSecond()); + throw ioe; + } + } + return true; + } finally { + closeBulkRegionOperation(); + } + } + @Override public boolean equals(Object o) { if (!(o instanceof HRegion)) { Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 13256) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -530,7 +530,7 @@ // Move the file if it's on another filesystem FileSystem srcFs = srcPath.getFileSystem(conf); - if (!srcFs.equals(fs)) { + if (!srcFs.getUri().equals(fs.getUri())) { LOG.info("File " + srcPath + " on different filesystem than " + "destination store - moving to this filesystem."); Path tmpPath = getTmpPath(); @@ -571,7 +571,73 @@ + " into store " + this + " (new location: " + dstPath + ")"); } + + + /** + * This method should only be called from HRegion. It is assumed that the + * ranges of values in the HFile fit within the stores assigned region. + * (assertBulkLoadHFileOk checks this) + */ + void bulkLoadHFileRefresh(String srcPathStr) throws IOException { + Path srcPath = new Path(srcPathStr); + + // Move the file if it's on another filesystem + FileSystem srcFs = srcPath.getFileSystem(conf); + if (!srcFs.getUri().equals(fs.getUri())) { + LOG.info("File " + srcPath + " on different filesystem than " + + "destination store - moving to this filesystem."); + Path tmpPath = getTmpPath(); + FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf); + LOG.info("Copied to temporary path on dst filesystem: " + tmpPath); + srcPath = tmpPath; + } + + Path dstPath = StoreFile.getRandomFilename(fs, homedir); + LOG.debug("Renaming bulk load file " + srcPath + " to " + dstPath); + StoreFile.rename(fs, srcPath, dstPath); + + StoreFile sf = new StoreFile(fs, dstPath, this.conf, this.cacheConf, + this.family.getBloomFilterType(), this.dataBlockEncoder); + passSchemaMetricsTo(sf); + + sf.createReader(); + + + LOG.info("Moved hfile " + srcPath + " into store directory " + + homedir + " - updating store file list."); + + // Append the new storefile into the list + this.lock.writeLock().lock(); + this.lock.readLock().lock(); + try { + ArrayList newFiles = new ArrayList(); + + + for(StoreFile sff :storefiles.asList()){ + sff.deleteReader(); + } + + + newFiles.add(sf); + this.storefiles = sortAndClone(newFiles); + + } finally { + // We need the lock, as long as we are updating the storefiles + // or changing the memstore. Let us release it before calling + // notifyChangeReadersObservers. See HBASE-4485 for a possible + // deadlock scenario that could have happened if continue to hold + // the lock. + this.lock.readLock().unlock(); + this.lock.writeLock().unlock(); + } + notifyChangedReadersObservers(); + LOG.info("Successfully loaded store file " + srcPath + + " into store " + this + " (new location: " + dstPath + ")"); + } + + + /** * Get a temporary path in this region. These temporary files * will get cleaned up when the region is re-opened if they are * still around. @@ -2065,7 +2131,12 @@ long getTotalStaticIndexSize() { long size = 0; for (StoreFile s : storefiles) { - size += s.getReader().getUncompressedDataIndexSize(); + StoreFile.Reader r = s.getReader(); + if (r == null) { + LOG.warn("StoreFile " + s + " has a null Reader"); + continue; + } + size += r.getUncompressedDataIndexSize(); } return size; } @@ -2081,6 +2152,10 @@ long size = 0; for (StoreFile s : storefiles) { StoreFile.Reader r = s.getReader(); + if (r == null) { + LOG.warn("StoreFile " + s + " has a null Reader"); + continue; + } size += r.getTotalBloomSize(); } return size; Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 13256) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -2653,6 +2653,19 @@ HRegion region = getRegion(regionName); return region.bulkLoadHFiles(familyPaths); } + + /** + * Atomically bulk load several HFiles into an open region + * @return true if successful, false is failed but recoverably (no action) + * @throws IOException if failed unrecoverably + */ + @Override + public boolean bulkLoadHFilesRefresh(List> familyPaths, + byte[] regionName) throws IOException { + checkOpen(); + HRegion region = getRegion(regionName); + return region.bulkLoadHFilesRefresh(familyPaths); + } Map rowlocks = new ConcurrentHashMap(); Index: src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (revision 13256) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (working copy) @@ -107,7 +107,7 @@ private void usage() { System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output " + - "tablename"); + "tablename"+" [refresh]"); } /** @@ -259,7 +259,100 @@ } } } + + /** + * Perform a bulk load of the given directory into the given + * pre-existing table. This method is not threadsafe. + * + * @param hfofDir the directory that was provided as the output path + * of a job using HFileOutputFormat + * @param table the table to load into + * @throws TableNotFoundException if table does not yet exist + */ + public void doBulkLoadRefresh(Path hfofDir, final HTable table) + throws TableNotFoundException, IOException + { + final HConnection conn = table.getConnection(); + if (!conn.isTableAvailable(table.getTableName())) { + throw new TableNotFoundException("Table " + + Bytes.toStringBinary(table.getTableName()) + + "is not currently available."); + } + + // initialize thread pools + int nrThreads = cfg.getInt("hbase.loadincremental.threads.max", + Runtime.getRuntime().availableProcessors()); + ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); + builder.setNameFormat("LoadIncrementalHFiles-%1$d"); + ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads, + 60, TimeUnit.SECONDS, + new LinkedBlockingQueue(), + builder.build()); + ((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true); + + // LQI queue does not need to be threadsafe -- all operations on this queue + // happen in this thread + Deque queue = new LinkedList(); + try { + discoverLoadQueue(queue, hfofDir); + int count = 0; + + if (queue.isEmpty()) { + LOG.warn("Bulk load operation did not find any files to load in " + + "directory " + hfofDir.toUri() + ". Does it contain files in " + + "subdirectories that correspond to column family names?"); + return; + } + + if (queue.isEmpty()) { + LOG.warn("Bulk load operation did not find any files to load in " + + "directory " + hfofDir.toUri() + ". Does it contain files in " + + "subdirectories that correspond to column family names?"); + } + + // Assumes that region splits can happen while this occurs. + while (!queue.isEmpty()) { + // need to reload split keys each iteration. + final Pair startEndKeys = table.getStartEndKeys(); + if (count != 0) { + LOG.info("Split occured while grouping HFiles, retry attempt " + + + count + " with " + queue.size() + " files remaining to group or split"); + } + + int maxRetries = cfg.getInt("hbase.bulkload.retries.number", 0); + if (maxRetries != 0 && count >= maxRetries) { + LOG.error("Retry attempted " + count + " times without completing, bailing out"); + return; + } + count++; + + // Using ByteBuffer for byte[] equality semantics + Multimap regionGroups = groupOrSplitPhase(table, + pool, queue, startEndKeys); + + bulkLoadPhaseRefresh(table, conn, pool, queue, regionGroups); + + // NOTE: The next iteration's split / group could happen in parallel to + // atomic bulkloads assuming that there are splits and no merges, and + // that we can atomically pull out the groups we want to retry. + } + + } finally { + pool.shutdown(); + if (queue != null && !queue.isEmpty()) { + StringBuilder err = new StringBuilder(); + err.append("-------------------------------------------------\n"); + err.append("Bulk load aborted with some files not yet loaded:\n"); + err.append("-------------------------------------------------\n"); + for (LoadQueueItem q : queue) { + err.append(" ").append(q.hfilePath).append('\n'); + } + LOG.error(err); + } + } + } + /** * This takes the LQI's grouped by likely regions and attempts to bulk load * them. Any failures are re-queued for another pass with the @@ -306,7 +399,54 @@ } } } + + /** + * This takes the LQI's grouped by likely regions and attempts to bulk load + * them. Any failures are re-queued for another pass with the + * groupOrSplitPhase. + */ + protected void bulkLoadPhaseRefresh(final HTable table, final HConnection conn, + ExecutorService pool, Deque queue, + final Multimap regionGroups) throws IOException { + // atomically bulk load the groups. + Set>> loadingFutures = new HashSet>>(); + for (Entry> e: regionGroups.asMap().entrySet()) { + final byte[] first = e.getKey().array(); + final Collection lqis = e.getValue(); + final Callable> call = new Callable>() { + public List call() throws Exception { + List toRetry = tryAtomicRegionLoadRefresh(conn, table.getTableName(), first, lqis); + return toRetry; + } + }; + loadingFutures.add(pool.submit(call)); + } + + // get all the results. + for (Future> future : loadingFutures) { + try { + List toRetry = future.get(); + + // LQIs that are requeued to be regrouped. + queue.addAll(toRetry); + + } catch (ExecutionException e1) { + Throwable t = e1.getCause(); + if (t instanceof IOException) { + // At this point something unrecoverable has happened. + // TODO Implement bulk load recovery + throw new IOException("BulkLoad encountered an unrecoverable problem", t); + } + LOG.error("Unexpected execution exception during bulk load", e1); + throw new IllegalStateException(t); + } catch (InterruptedException e1) { + LOG.error("Unexpected interrupted exception during bulk load", e1); + throw new IllegalStateException(e1); + } + } + } + /** * @return A Multimap that groups LQI by likely * bulk load region targets. @@ -503,7 +643,58 @@ throw e; } } + + /** + * Attempts to do an atomic load of many hfiles into a region. If it fails, + * it returns a list of hfiles that need to be retried. If it is successful + * it will return an empty list. + * + * NOTE: To maintain row atomicity guarantees, region server callable should + * succeed atomically and fails atomically. + * + * Protected for testing. + * + * @return empty list if success, list of items to retry on recoverable + * failure + */ + protected List tryAtomicRegionLoadRefresh(final HConnection conn, + byte[] tableName, final byte[] first, Collection lqis) throws IOException { + final List> famPaths = + new ArrayList>(lqis.size()); + for (LoadQueueItem lqi : lqis) { + famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString())); + } + + final ServerCallable svrCallable = new ServerCallable(conn, + tableName, first) { + @Override + public Boolean call() throws Exception { + LOG.debug("Going to connect to server " + location + " for row " + + Bytes.toStringBinary(row)); + byte[] regionName = location.getRegionInfo().getRegionName(); + return server.bulkLoadHFilesRefresh(famPaths, regionName); + } + }; + + try { + List toRetry = new ArrayList(); + boolean success = svrCallable.withRetries(); + if (!success) { + LOG.warn("Attempt to bulk load region containing " + + Bytes.toStringBinary(first) + " into table " + + Bytes.toStringBinary(tableName) + " with files " + lqis + + " failed. This is recoverable and they will be retried."); + toRetry.addAll(lqis); // return lqi's to retry + } + // success + return toRetry; + } catch (IOException e) { + LOG.error("Encountered unrecoverable error from region server", e); + throw e; + } + } + /** * Split a storefile into a top and bottom half, maintaining * the metadata, recreating bloom filters, etc. @@ -688,25 +879,29 @@ @Override public int run(String[] args) throws Exception { - if (args.length != 2) { + if (args.length < 2||args.length >3) { usage(); return -1; } String dirPath = args[0]; String tableName = args[1]; + boolean tableExists = this.doesTableExist(tableName); if (!tableExists) this.createTable(tableName,dirPath); Path hfofDir = new Path(dirPath); HTable table = new HTable(this.cfg, tableName); - - doBulkLoad(hfofDir, table); + if(args.length==3){ + doBulkLoadRefresh(hfofDir,table); + }else + doBulkLoad(hfofDir, table); return 0; } public static void main(String[] args) throws Exception { + int ret = ToolRunner.run(new LoadIncrementalHFiles(HBaseConfiguration.create()), args); System.exit(ret); } Index: src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (revision 13256) +++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (working copy) @@ -390,6 +390,18 @@ */ public boolean bulkLoadHFiles(List> familyPaths, byte[] regionName) throws IOException; + + /** + * Atomically bulk load multiple HFiles (say from different column families) + * into an open region. + * + * @param familyPaths List of (family, hfile path) pairs + * @param regionName name of region to load hfiles into + * @return true if successful, false if failed recoverably + * @throws IOException if fails unrecoverably + */ + public boolean bulkLoadHFilesRefresh(List> familyPaths, byte[] regionName) + throws IOException; // Master methods