From 3fb307601d38e150c8cf8099d971e4f05c3cb4ff Mon Sep 17 00:00:00 2001 From: Peter Slawski Date: Tue, 5 Apr 2016 16:10:56 -0700 Subject: [PATCH] HIVE-13512: Make initializing dag ids in TezWork thread safe for parallel compilation --- .../org/apache/hadoop/hive/ql/plan/TezWork.java | 9 ++- .../hive/ql/plan/TestTezWorkConcurrency.java | 65 ++++++++++++++++++++++ 2 files changed, 72 insertions(+), 2 deletions(-) create mode 100644 ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWorkConcurrency.java diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java index c6ef829..7a70e6b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -69,7 +70,7 @@ public static boolean isCustomInputType(VertexType vertex) { private static transient final Logger LOG = LoggerFactory.getLogger(TezWork.class); - private static int counter; + private static final AtomicInteger counter = new AtomicInteger(1); private final String dagId; private final String queryName; private final Set roots = new HashSet(); @@ -80,8 +81,12 @@ public static boolean isCustomInputType(VertexType vertex) { new HashMap, TezEdgeProperty>(); private final Map workVertexTypeMap = new HashMap(); + public TezWork(String queryId) { + this(queryId, null); + } + public TezWork(String queryId, Configuration conf) { - this.dagId = queryId + ":" + (++counter); + this.dagId = queryId + ":" + counter.getAndIncrement(); String queryName = (conf != null) ? DagUtils.getUserSpecifiedDagName(conf) : null; if (queryName == null) { queryName = this.dagId; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWorkConcurrency.java b/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWorkConcurrency.java new file mode 100644 index 0000000..c59fd10 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWorkConcurrency.java @@ -0,0 +1,65 @@ +package org.apache.hadoop.hive.ql.plan; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.junit.Test; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; + +import static org.junit.Assert.assertEquals; + +public final class TestTezWorkConcurrency { + + @Test + public void ensureDagIdIsUnique() throws Exception { + final int threadCount = 5; + final CountDownLatch threadReadyToStartSignal = new CountDownLatch(threadCount); + final CountDownLatch startThreadSignal = new CountDownLatch(1); + final int numberOfTezWorkToCreatePerThread = 100; + + List>> tasks = Lists.newArrayList(); + for (int i = 0; i < threadCount; i++) { + tasks.add(new FutureTask<>(new Callable>() { + @Override + public Set call() throws Exception { + threadReadyToStartSignal.countDown(); + startThreadSignal.await(); + return generateTezWorkDagIds(numberOfTezWorkToCreatePerThread); + } + })); + } + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + for (FutureTask> task : tasks) { + executor.execute(task); + } + threadReadyToStartSignal.await(); + startThreadSignal.countDown(); + Set allTezWorkDagIds = getAllTezWorkDagIds(tasks); + assertEquals(threadCount * numberOfTezWorkToCreatePerThread, allTezWorkDagIds.size()); + } + + private static Set generateTezWorkDagIds(int numberOfNames) { + Set tezWorkIds = Sets.newHashSet(); + for (int i = 0; i < numberOfNames; i++) { + TezWork work = new TezWork("query-id"); + tezWorkIds.add(work.getDagId()); + } + return tezWorkIds; + } + + private static Set getAllTezWorkDagIds(List>> tasks) + throws ExecutionException, InterruptedException { + Set allTezWorkDagIds = Sets.newHashSet(); + for (FutureTask> task : tasks) { + allTezWorkDagIds.addAll(task.get()); + } + return allTezWorkDagIds; + } +} -- 2.5.4 (Apple Git-61)