From e1fbecb18ca283dd88402ca974163c53e6310c28 Mon Sep 17 00:00:00 2001 From: Gopal V Date: Mon, 6 Apr 2015 20:56:16 -0700 Subject: [PATCH] orc syncs --- .../hadoop/hive/ql/io/orc/MemoryManager.java | 39 ++++++++++++++---- .../org/apache/hadoop/hive/ql/io/orc/OrcFile.java | 15 ++++--- .../apache/hadoop/hive/ql/io/orc/WriterImpl.java | 46 +++++++++++----------- 3 files changed, 66 insertions(+), 34 deletions(-) diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java index 821bd35..6432d6e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java @@ -24,10 +24,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import com.google.common.base.Preconditions; + import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; /** * Implements a memory manager that keeps a global context of how many ORC @@ -35,9 +38,9 @@ * dynamic partitions, it is easy to end up with many writers in the same task. * By managing the size of each allocation, we try to cut down the size of each * allocation and keep the task from running out of memory. - * - * This class is thread safe and uses synchronization around the shared state - * to prevent race conditions. + * + * This class is not thread safe, but is re-entrant - ensure creation and all + * invocations are triggered from the same thread. */ class MemoryManager { @@ -54,6 +57,14 @@ private long totalAllocation = 0; private double currentScale = 1; private int rowsAddedSinceCheck = 0; + private final OwnedLock ownerLock = new OwnedLock(); + + @SuppressWarnings("serial") + private static class OwnedLock extends ReentrantLock { + public Thread getOwner() { + return super.getOwner(); + } + } private static class WriterInfo { long allocation; @@ -84,6 +95,17 @@ double maxLoad = conf.getFloat(poolVar.varname, poolVar.defaultFloatVal); totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean(). getHeapMemoryUsage().getMax() * maxLoad); + ownerLock.lock(); + } + + /** + * Light weight thread-safety check for multi-threaded access patterns + */ + private void checkOwner() { + Preconditions.checkArgument(ownerLock.isHeldByCurrentThread(), + "Owner thread expected %s, got %s", + ownerLock.getOwner(), + Thread.currentThread()); } /** @@ -92,8 +114,9 @@ * @param path the file that is being written * @param requestedAllocation the requested buffer size */ - synchronized void addWriter(Path path, long requestedAllocation, + void addWriter(Path path, long requestedAllocation, Callback callback) throws IOException { + checkOwner(); WriterInfo oldVal = writerList.get(path); // this should always be null, but we handle the case where the memory // manager wasn't told that a writer wasn't still in use and the task @@ -115,7 +138,8 @@ synchronized void addWriter(Path path, long requestedAllocation, * Remove the given writer from the pool. * @param path the file that has been closed */ - synchronized void removeWriter(Path path) throws IOException { + void removeWriter(Path path) throws IOException { + checkOwner(); WriterInfo val = writerList.get(path); if (val != null) { writerList.remove(path); @@ -144,7 +168,7 @@ long getTotalMemoryPool() { * @return a fraction between 0.0 and 1.0 of the requested size that is * available for each writer. */ - synchronized double getAllocationScale() { + double getAllocationScale() { return currentScale; } @@ -152,7 +176,7 @@ synchronized double getAllocationScale() { * Give the memory manager an opportunity for doing a memory check. * @throws IOException */ - synchronized void addedRow() throws IOException { + void addedRow() throws IOException { if (++rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) { notifyWriters(); } @@ -163,6 +187,7 @@ synchronized void addedRow() throws IOException { * @throws IOException */ void notifyWriters() throws IOException { + checkOwner(); LOG.debug("Notifying writers after " + rowsAddedSinceCheck); for(WriterInfo writer: writerList.values()) { boolean flushed = writer.callback.checkMemory(currentScale); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java index 49a8e80..2cf7c79 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java @@ -503,14 +503,19 @@ public static Writer createWriter(FileSystem fs, .rowIndexStride(rowIndexStride)); } - private static MemoryManager memoryManager = null; + private static ThreadLocal memoryManager = null; - private static synchronized - MemoryManager getMemoryManager(Configuration conf) { + private static synchronized MemoryManager getMemoryManager( + final Configuration conf) { if (memoryManager == null) { - memoryManager = new MemoryManager(conf); + memoryManager = new ThreadLocal() { + @Override + protected MemoryManager initialValue() { + return new MemoryManager(conf); + } + }; } - return memoryManager; + return memoryManager.get(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index a319204..d3172a3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -95,9 +95,14 @@ * sub-types. Each of the TreeWriters writes the column's data as a set of * streams. * - * This class is synchronized so that multi-threaded access is ok. In - * particular, because the MemoryManager is shared between writers, this class - * assumes that checkMemory may be called from a separate thread. + * This class is unsynchronized like most Stream objects, so from the creation of an OrcFile and all + * access to a single instance has to be from a single thread. + * + * There are no known cases where these happen between different threads today. + * + * Caveat: the MemoryManager is created during WriterOptions create, that has to be confined to a single + * thread as well. + * */ public class WriterImpl implements Writer, MemoryManager.Callback { @@ -341,7 +346,7 @@ public static CompressionCodec createCodec(CompressionKind kind) { } @Override - public synchronized boolean checkMemory(double newScale) throws IOException { + public boolean checkMemory(double newScale) throws IOException { long limit = (long) Math.round(adjustedStripeSize * newScale); long size = estimateStripeSize(); if (LOG.isDebugEnabled()) { @@ -2400,21 +2405,19 @@ private long estimateStripeSize() { } @Override - public synchronized void addUserMetadata(String name, ByteBuffer value) { + public void addUserMetadata(String name, ByteBuffer value) { userMetadata.put(name, ByteString.copyFrom(value)); } @Override public void addRow(Object row) throws IOException { - synchronized (this) { - treeWriter.write(row); - rowsInStripe += 1; - if (buildIndex) { - rowsInIndex += 1; - - if (rowsInIndex >= rowIndexStride) { - createRowIndexEntry(); - } + treeWriter.write(row); + rowsInStripe += 1; + if (buildIndex) { + rowsInIndex += 1; + + if (rowsInIndex >= rowIndexStride) { + createRowIndexEntry(); } } memoryManager.addedRow(); @@ -2428,13 +2431,12 @@ public void close() throws IOException { // remove us from the memory manager so that we don't get any callbacks memoryManager.removeWriter(path); // actually close the file - synchronized (this) { - flushStripe(); - int metadataLength = writeMetadata(rawWriter.getPos()); - int footerLength = writeFooter(rawWriter.getPos() - metadataLength); - rawWriter.writeByte(writePostScript(footerLength, metadataLength)); - rawWriter.close(); - } + flushStripe(); + int metadataLength = writeMetadata(rawWriter.getPos()); + int footerLength = writeFooter(rawWriter.getPos() - metadataLength); + rawWriter.writeByte(writePostScript(footerLength, metadataLength)); + rawWriter.close(); + } /** @@ -2456,7 +2458,7 @@ public long getNumberOfRows() { } @Override - public synchronized long writeIntermediateFooter() throws IOException { + public long writeIntermediateFooter() throws IOException { // flush any buffered rows flushStripe(); // write a footer -- 1.9.5 (Apple Git-50.3)