diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index da45f1a..47af319 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -630,6 +630,9 @@ // this way we can keep only one record writer open for each partition value // in the reducer thereby reducing the memory pressure on reducers HIVEOPTSORTDYNAMICPARTITION("hive.optimize.sort.dynamic.partition", true), + //maximum number of paths that can be kept in memory for closing + HIVEOPT_FILESINKOPERATOR_PATHS_MAX_SIZE("hive.optimize.filesinkoperator.paths.maxsize", 1), + HIVEOPT_FILESINKOPERATOR_CLOSE_THREADS_SIZE("hive.filesinkoperator.close.threads.size", 1), HIVESAMPLINGFORORDERBY("hive.optimize.sampling.orderby", false), HIVESAMPLINGNUMBERFORORDERBY("hive.optimize.sampling.orderby.number", 1000), diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 6af6b2d..8822ce0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -27,6 +27,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -102,6 +106,87 @@ private transient List keys; private transient int numKeyColToRead; + //queues to hold the set of paths to be closed or committed + private transient BlockingQueue pathToBeClosedList; + private transient BlockingQueue pathToBeCommitedList; + private transient AtomicBoolean done = new AtomicBoolean(false); + private transient AtomicBoolean commit = new AtomicBoolean(false); + //Threads for closing the path. + private transient CloseThread[] closeThread; + //Threads for committing + private transient CloseThread[] commiterThread; + + class CloseThread extends Thread { + boolean closeAllWriters; + boolean commit; + AtomicBoolean done; + BlockingQueue pathsToClose; + + public CloseThread(boolean closeAllWriters, boolean commit, + AtomicBoolean doneFlag, BlockingQueue queue) { + this.closeAllWriters = closeAllWriters; + this.commit = commit; + this.done = doneFlag; + this.pathsToClose = queue; + } + + @Override + public void run() { + LOG.info("starting close thread : " + Thread.currentThread().getId() + + " closeAllWriters=" + closeAllWriters + "; commit=" + commit); + while(!this.done.get() && !Thread.currentThread().isInterrupted()) { + try { + FSPaths prevPathToClose = pathsToClose.take(); + if (prevPathToClose != null) { + int length = (this.closeAllWriters) ? prevPathToClose.outWriters.length : 1; + close(prevPathToClose, length); + } + } catch(InterruptedException ie) { + //safe to ignore + } catch(HiveException t) { + //TODO: Should we shutdown processing? Check with Vikram/Prashant + } + } + LOG.info("Exiting.." + Thread.currentThread().getId()); + } + + private void close(FSPaths fsp, int length) throws HiveException { + long sTime = System.currentTimeMillis(); + fsp.closeWriters(false); + + // before closing the operator check if statistics gathering is requested + // and is provided by record writer. this is different from the statistics + // gathering done in processOp(). In processOp(), for each row added + // serde statistics about the row is gathered and accumulated in hashmap. + // this adds more overhead to the actual processing of row. But if the + // record writer already gathers the statistics, it can simply return the + // accumulated statistics which will be aggregated in case of spray writers + if (conf.isGatherStats() && isCollectRWStats) { + for (int idx = 0; idx < length; idx++) { + FSRecordWriter outWriter = fsp.outWriters[idx]; + if (outWriter != null) { + SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats(); + if (stats != null) { + fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize()); + fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount()); + } + } + } + } + if (isNativeTable && commit) { + LOG.info("Committing..."); + fsp.commit(fs); + } else { + for (int idx = 0; idx < length; idx++) { + fsp.outWriters[idx] = null; + } + fsp = null; + } + LOG.info("time taken for closeWriters=" + (System.currentTimeMillis() - sTime) + + "; writerLen=" + length + "; queueLength=" + pathToBeClosedList.size() + + "; pathToBeCommitted=" + pathToBeCommitedList.size()); + } + } /** * RecordWriter. * @@ -295,6 +380,23 @@ protected void initializeOp(Configuration hconf) throws HiveException { serializer.initialize(null, conf.getTableInfo().getProperties()); outputClass = serializer.getSerializedClass(); + int size = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEOPT_FILESINKOPERATOR_PATHS_MAX_SIZE); + LOG.info(HiveConf.ConfVars.HIVEOPT_FILESINKOPERATOR_PATHS_MAX_SIZE + "=" + size); + pathToBeClosedList = new LinkedBlockingQueue(size); + pathToBeCommitedList = new LinkedBlockingQueue(size); + + + int threads = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEOPT_FILESINKOPERATOR_CLOSE_THREADS_SIZE); + closeThread = new CloseThread[threads]; + commiterThread = new CloseThread[threads]; + for(int i=0;i row, String lbDirName) throws HiveException { - + long sTime = System.currentTimeMillis(); FSPaths fp; // get the path corresponding to the dynamic partition columns, @@ -751,33 +853,23 @@ protected FSPaths getDynOutPaths(List row, String lbDirName) throws Hive } if (!conf.getDpSortState().equals(DPSortState.NONE) && prevFsp != null) { - // close the previous fsp as it is no longer needed - prevFsp.closeWriters(false); - - // since we are closing the previous fsp's record writers, we need to see if we can get - // stats from the record writer and store in the previous fsp that is cached - if (conf.isGatherStats() && isCollectRWStats) { - FSRecordWriter outWriter = prevFsp.outWriters[0]; - if (outWriter != null) { - SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats(); - if (stats != null) { - prevFsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize()); - prevFsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount()); - } + try { + pathToBeClosedList.put(prevFsp); + if (LOG.isDebugEnabled()) { + LOG.debug("pathToBeClosed(after addition) size=" + pathToBeClosedList.size()); } + } catch(InterruptedException ie) { + //TODO: Ignore as of now. } - - // let writers release the memory for garbage collection - prevFsp.outWriters[0] = null; - prevFsp = null; } - fsp2 = createNewPaths(dpDir); if (prevFsp == null) { prevFsp = fsp2; } - + LOG.info("time taken for createNewPaths=" + + (System.currentTimeMillis() - sTime) + "; totalFiles=" + + totalFiles + "; pathToBeClosed size=" + pathToBeClosedList.size()); if(conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) { createBucketForFileIdx(fsp2, 0); valToPaths.put(pathKey, fsp2); @@ -787,6 +879,7 @@ protected FSPaths getDynOutPaths(List row, String lbDirName) throws Hive } else { fp = fsp; } + LOG.info("Time taken for getDynOutPaths : " + (System.currentTimeMillis() - sTime)); return fp; } @@ -836,40 +929,20 @@ public void closeOp(boolean abort) throws HiveException { if (!bDynParts && !filesCreated) { createBucketFiles(fsp); } - + LOG.info("Executing closeOp with abort=" + abort); lastProgressReport = System.currentTimeMillis(); if (!abort) { for (FSPaths fsp : valToPaths.values()) { - fsp.closeWriters(abort); - - // before closing the operator check if statistics gathering is requested - // and is provided by record writer. this is different from the statistics - // gathering done in processOp(). In processOp(), for each row added - // serde statistics about the row is gathered and accumulated in hashmap. - // this adds more overhead to the actual processing of row. But if the - // record writer already gathers the statistics, it can simply return the - // accumulated statistics which will be aggregated in case of spray writers - if (conf.isGatherStats() && isCollectRWStats) { - for (int idx = 0; idx < fsp.outWriters.length; idx++) { - FSRecordWriter outWriter = fsp.outWriters[idx]; - if (outWriter != null) { - SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats(); - if (stats != null) { - fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize()); - fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount()); - } - } - } + try { + pathToBeCommitedList.put(fsp); + } catch(InterruptedException ie) { + //safe to ignore } - - if (isNativeTable) { - fsp.commit(fs); + if (LOG.isDebugEnabled()) { + LOG.debug("Enqueuing to pathToBeCommitedList (size)=" + pathToBeCommitedList.size() + + ", pathsToBeClosedList (size)=" + pathToBeClosedList.size()); } } - // Only publish stats if this operator's flag was set to gather stats - if (conf.isGatherStats()) { - publishStats(); - } } else { // Will come here if an Exception was thrown in map() or reduce(). // Hadoop always call close() even if an Exception was thrown in map() or @@ -879,6 +952,33 @@ public void closeOp(boolean abort) throws HiveException { } } fsp = prevFsp = null; + + //Wait until all the paths are closed + while(!pathToBeClosedList.isEmpty() || !pathToBeCommitedList.isEmpty()) { + LOG.info("Sleeping...pathToBeClosed or pathToBeCommittedList is not " + + "empty..wait for sometime..pathToBeClosedList=" + + pathToBeClosedList.size() + "; pathsToBeCommittedList=" + + pathToBeCommitedList.size()); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + //Good to go and shutdown the threads. + done.set(true); + commit.set(true); + + // Only publish stats if this operator's flag was set to gather stats + // Wait for all the threads to finish closing the writers + if (!abort && conf.isGatherStats()) { + publishStats(); + } + + for(int i=0;i