Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.9.0
    • Component/s: Data Shuffle
    • Labels:
      None

      Description

      Currently Tajo creates too many intermediate files in the case of hash shuffle. A execution block(SubQuery) on a TajoWorker creates intermediate files as following rule:

      1. intermediate files in a worker = # tasks / # workers * # partitions

      This may cause 'too many file opens' error and makes it difficult to scale out. To solve this problem, We should reduce number of hash shuffle output file.

        Activity

        Hide
        githubbot ASF GitHub Bot added a comment -

        GitHub user babokim opened a pull request:

        https://github.com/apache/tajo/pull/115

        TAJO-992: Reduce number of hash shuffle output file.

        For this I added the following features.

        • HashShuffleAppender which is created a single instance each a ExecutionBlock and Partition in a Worker.
          Therefore, all execution block's tasks in a worker share a HashShuffleAppender. Each task's HashShuffleWriteExec calls HashShuffleAppender.appends() every 'tajo.shuffle.hash.appender.buffer.size' tuples(default is 10,000) for coarse-grained lock.
        • Splittable IntermediateEntry
          If a intermediate file is large, it is difficult to process with multiple tasks. New IntermediateEntry class has page meta data which contains start position and length every 'tajo.shuffle.hash.appender.page.volumn-mb' value(default: 30MB). Repartitioner class use that meta data for making proper number of tasks.
        • Failure awareness IntermediateEntry
          If specified task is failed, failed task's tuples in the intermediate file should be removed. But this is impossible because that tuples are already written in a file. For this IntermediateEntry has Task's tuple index meta. RawFile's scanner can use this data. But in this patch that meta is not used. I'll create another for this.

        You can merge this pull request into a Git repository by running:

        $ git pull https://github.com/babokim/tajo TAJO-992

        Alternatively you can review and apply these changes as the patch at:

        https://github.com/apache/tajo/pull/115.patch

        To close this pull request, make a commit to your master/trunk branch
        with (at least) the following in the commit message:

        This closes #115


        commit f020bdd0ead06de5903a251fe02a534880420e35
        Author: 김형준 <babokim@babokim-macbook-pro.local>
        Date: 2014-08-05T11:26:38Z

        TAJO-992: Reduce number of hash shuffle output file.

        commit 36c98e20d118c8f217d7c065b574136847174f8a
        Author: 김형준 <babokim@babokim-macbook-pro.local>
        Date: 2014-08-05T12:15:41Z

        Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo

        Conflicts:
        tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java

        commit 06045064ec32b6111ece0abf7343402e419ca608
        Author: 김형준 <babokim@babokim-macbook-pro.local>
        Date: 2014-08-06T13:57:35Z

        TAJO-992: Reduce number of hash shuffle output file.

        commit 028f498eb18c9094b8ac7641d628ec58e3ffb605
        Author: HyoungJun Kim <babokim@babokim-macbook-pro.local>
        Date: 2014-08-11T21:37:52Z

        TAJO-992: Reduce number of hash shuffle output file.
        Splittable IntermediateEntry.

        commit e02f0cdf14b502dd949cf9cc5e7c0893ec312e10
        Author: HyoungJun Kim <babokim@babokim-mbp.server.gruter.com>
        Date: 2014-08-12T05:56:36Z

        TAJO-992: Reduce number of hash shuffle output file.
        Add some debug logs

        commit 2d49339111be67d058158431a94680ab2749000d
        Author: HyoungJun Kim <babokim@babokim-mbp.server.gruter.com>
        Date: 2014-08-12T06:09:51Z

        TAJO-992: Reduce number of hash shuffle output file.
        Remove unused log

        commit 88775ef40071c8c40eec63371c8e3523658886f0
        Author: HyoungJun Kim <babokim@babokim-macbook-pro.local>
        Date: 2014-08-13T11:34:07Z

        Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into TAJO-992

        Conflicts:
        tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
        tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
        tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
        tajo-core/src/main/java/org/apache/tajo/worker/Task.java
        tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
        tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
        tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
        tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java

        commit 98e6314ab4974453035647f2bc78940fcb096d9e
        Author: HyoungJun Kim <babokim@babokim-macbook-pro.local>
        Date: 2014-08-13T12:36:06Z

        TAJO-992: Reduce number of hash shuffle output file.
        Fix a wrong calculation of Bytes in StorageUnit


        Show
        githubbot ASF GitHub Bot added a comment - GitHub user babokim opened a pull request: https://github.com/apache/tajo/pull/115 TAJO-992 : Reduce number of hash shuffle output file. For this I added the following features. HashShuffleAppender which is created a single instance each a ExecutionBlock and Partition in a Worker. Therefore, all execution block's tasks in a worker share a HashShuffleAppender. Each task's HashShuffleWriteExec calls HashShuffleAppender.appends() every 'tajo.shuffle.hash.appender.buffer.size' tuples(default is 10,000) for coarse-grained lock. Splittable IntermediateEntry If a intermediate file is large, it is difficult to process with multiple tasks. New IntermediateEntry class has page meta data which contains start position and length every 'tajo.shuffle.hash.appender.page.volumn-mb' value(default: 30MB). Repartitioner class use that meta data for making proper number of tasks. Failure awareness IntermediateEntry If specified task is failed, failed task's tuples in the intermediate file should be removed. But this is impossible because that tuples are already written in a file. For this IntermediateEntry has Task's tuple index meta. RawFile's scanner can use this data. But in this patch that meta is not used. I'll create another for this. You can merge this pull request into a Git repository by running: $ git pull https://github.com/babokim/tajo TAJO-992 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/tajo/pull/115.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #115 commit f020bdd0ead06de5903a251fe02a534880420e35 Author: 김형준 <babokim@babokim-macbook-pro.local> Date: 2014-08-05T11:26:38Z TAJO-992 : Reduce number of hash shuffle output file. commit 36c98e20d118c8f217d7c065b574136847174f8a Author: 김형준 <babokim@babokim-macbook-pro.local> Date: 2014-08-05T12:15:41Z Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo Conflicts: tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java commit 06045064ec32b6111ece0abf7343402e419ca608 Author: 김형준 <babokim@babokim-macbook-pro.local> Date: 2014-08-06T13:57:35Z TAJO-992 : Reduce number of hash shuffle output file. commit 028f498eb18c9094b8ac7641d628ec58e3ffb605 Author: HyoungJun Kim <babokim@babokim-macbook-pro.local> Date: 2014-08-11T21:37:52Z TAJO-992 : Reduce number of hash shuffle output file. Splittable IntermediateEntry. commit e02f0cdf14b502dd949cf9cc5e7c0893ec312e10 Author: HyoungJun Kim <babokim@babokim-mbp.server.gruter.com> Date: 2014-08-12T05:56:36Z TAJO-992 : Reduce number of hash shuffle output file. Add some debug logs commit 2d49339111be67d058158431a94680ab2749000d Author: HyoungJun Kim <babokim@babokim-mbp.server.gruter.com> Date: 2014-08-12T06:09:51Z TAJO-992 : Reduce number of hash shuffle output file. Remove unused log commit 88775ef40071c8c40eec63371c8e3523658886f0 Author: HyoungJun Kim <babokim@babokim-macbook-pro.local> Date: 2014-08-13T11:34:07Z Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into TAJO-992 Conflicts: tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java tajo-core/src/main/java/org/apache/tajo/worker/Task.java tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java commit 98e6314ab4974453035647f2bc78940fcb096d9e Author: HyoungJun Kim <babokim@babokim-macbook-pro.local> Date: 2014-08-13T12:36:06Z TAJO-992 : Reduce number of hash shuffle output file. Fix a wrong calculation of Bytes in StorageUnit
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user hyunsik commented on a diff in the pull request:

        https://github.com/apache/tajo/pull/115#discussion_r16338083

        — Diff: tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java —
        @@ -923,20 +960,62 @@ public static void scheduleScatteredHashShuffleFetches(TaskSchedulerContext sche

        LOG.info(subQuery.getId()
        + ", ShuffleType:" + SCATTERED_HASH_SHUFFLE.name()

        • + ", DeterminedTaskNum : " + fetches.size());
          + + ", Intermediate Size: " + totalIntermediateSize
          + + ", splitSize: " + splitVolume
          + + ", DeterminedTaskNum: " + fetches.size());
          }
        • static class IntermediateEntryComparator implements Comparator<IntermediateEntry> {
          + /**
          + * If a IntermediateEntry is large than splitVolume, List<FetchImpl> has single element.
          + * @param ebId
          + * @param entries
          + * @param splitVolume
          + * @return
          + */
          + public static List<List<FetchImpl>> splitOrMergeIntermediates(
          + ExecutionBlockId ebId, List<IntermediateEntry> entries, long splitVolume, long pageSize) {
          + // Each List<FetchImpl> has splitVolume size.
          + List<List<FetchImpl>> fetches = new ArrayList<List<FetchImpl>>();
          +
          + Iterator<IntermediateEntry> iter = entries.iterator();
          + if (!iter.hasNext()) { + return null; + }

          + List<FetchImpl> fetchListForSingleTask = new ArrayList<FetchImpl>();
          + long fetchListVolume = 0;

        • @Override
        • public int compare(IntermediateEntry o1, IntermediateEntry o2) {
        • int cmp = Ints.compare(o1.getPartId(), o2.getPartId());
        • if (cmp != 0) {
        • return cmp;
          + while (iter.hasNext()) {
          + IntermediateEntry currentInterm = iter.next();
          +
          + long firstSplitVolume = splitVolume - fetchListVolume;
          + if (firstSplitVolume < pageSize) { + firstSplitVolume = splitVolume; + }

          + List<Pair<Long, Long>> splits = currentInterm.split(firstSplitVolume, splitVolume);

            • End diff –

        Could you add some comments what each long of a pair means?

        Show
        githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on a diff in the pull request: https://github.com/apache/tajo/pull/115#discussion_r16338083 — Diff: tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java — @@ -923,20 +960,62 @@ public static void scheduleScatteredHashShuffleFetches(TaskSchedulerContext sche LOG.info(subQuery.getId() + ", ShuffleType:" + SCATTERED_HASH_SHUFFLE.name() + ", DeterminedTaskNum : " + fetches.size()); + + ", Intermediate Size: " + totalIntermediateSize + + ", splitSize: " + splitVolume + + ", DeterminedTaskNum: " + fetches.size()); } static class IntermediateEntryComparator implements Comparator<IntermediateEntry> { + /** + * If a IntermediateEntry is large than splitVolume, List<FetchImpl> has single element. + * @param ebId + * @param entries + * @param splitVolume + * @return + */ + public static List<List<FetchImpl>> splitOrMergeIntermediates( + ExecutionBlockId ebId, List<IntermediateEntry> entries, long splitVolume, long pageSize) { + // Each List<FetchImpl> has splitVolume size. + List<List<FetchImpl>> fetches = new ArrayList<List<FetchImpl>>(); + + Iterator<IntermediateEntry> iter = entries.iterator(); + if (!iter.hasNext()) { + return null; + } + List<FetchImpl> fetchListForSingleTask = new ArrayList<FetchImpl>(); + long fetchListVolume = 0; @Override public int compare(IntermediateEntry o1, IntermediateEntry o2) { int cmp = Ints.compare(o1.getPartId(), o2.getPartId()); if (cmp != 0) { return cmp; + while (iter.hasNext()) { + IntermediateEntry currentInterm = iter.next(); + + long firstSplitVolume = splitVolume - fetchListVolume; + if (firstSplitVolume < pageSize) { + firstSplitVolume = splitVolume; + } + List<Pair<Long, Long>> splits = currentInterm.split(firstSplitVolume, splitVolume); End diff – Could you add some comments what each long of a pair means?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user hyunsik commented on a diff in the pull request:

        https://github.com/apache/tajo/pull/115#discussion_r16338719

        — Diff: tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java —
        @@ -0,0 +1,240 @@
        +/**
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.tajo.storage;
        +
        +import org.apache.commons.logging.Log;
        +import org.apache.commons.logging.LogFactory;
        +import org.apache.hadoop.fs.FileStatus;
        +import org.apache.hadoop.fs.FileSystem;
        +import org.apache.hadoop.fs.LocalDirAllocator;
        +import org.apache.hadoop.fs.Path;
        +import org.apache.tajo.ExecutionBlockId;
        +import org.apache.tajo.QueryUnitAttemptId;
        +import org.apache.tajo.catalog.Schema;
        +import org.apache.tajo.catalog.TableMeta;
        +import org.apache.tajo.conf.TajoConf;
        +import org.apache.tajo.conf.TajoConf.ConfVars;
        +import org.apache.tajo.util.Pair;
        +
        +import java.io.IOException;
        +import java.util.ArrayList;
        +import java.util.Collection;
        +import java.util.List;
        +import java.util.Map;
        +import java.util.concurrent.ConcurrentHashMap;
        +
        +public class HashShuffleAppenderManager {
        + private static final Log LOG = LogFactory.getLog(HashShuffleAppenderManager.class);
        +
        + private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> appenderMap =
        + new ConcurrentHashMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>>();
        + private TajoConf systemConf;
        + private FileSystem defaultFS;
        + private FileSystem localFS;
        + private LocalDirAllocator lDirAllocator;
        + private int pageSize;
        +
        + public HashShuffleAppenderManager(TajoConf systemConf) throws IOException

        { + this.systemConf = systemConf; + + // initialize LocalDirAllocator + lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); + + // initialize DFS and LocalFileSystems + defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); + localFS = FileSystem.getLocal(systemConf); + pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024; + }

        +
        + public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId,
        + TableMeta meta, Schema outSchema) throws IOException {
        + synchronized (appenderMap) {
        + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId);
        +
        + if (partitionAppenderMap == null)

        { + partitionAppenderMap = new ConcurrentHashMap<Integer, PartitionAppenderMeta>(); + appenderMap.put(ebId, partitionAppenderMap); + }

        +
        + PartitionAppenderMeta partitionAppenderMeta = partitionAppenderMap.get(partId);
        + if (partitionAppenderMeta == null) {
        + Path dataFile = getDataFile(ebId, partId);
        + FileSystem fs = dataFile.getFileSystem(systemConf);
        + if (fs.exists(dataFile))

        { + FileStatus status = fs.getFileStatus(dataFile); + LOG.info("File " + dataFile + " already exists, size=" + status.getLen()); + }

        +
        + if (!fs.exists(dataFile.getParent()))

        { + fs.mkdirs(dataFile.getParent()); + }

        + FileAppender appender = (FileAppender) StorageManagerFactory.getStorageManager(
        + tajoConf).getAppender(meta, outSchema, dataFile);
        + appender.enableStats();
        + appender.init();
        +
        + partitionAppenderMeta = new PartitionAppenderMeta();
        + partitionAppenderMeta.partId = partId;
        + partitionAppenderMeta.dataFile = dataFile;
        + partitionAppenderMeta.appender = new HashShuffleAppender(ebId, partId, pageSize, appender);
        + partitionAppenderMeta.appender.init();
        + partitionAppenderMap.put(partId, partitionAppenderMeta);
        +
        + LOG.info("Create Hash shuffle file(partId=" + partId + "): " + dataFile);
        + }
        +
        + return partitionAppenderMeta.appender;
        + }
        + }
        +
        + public static int getPartParentId(int partId, TajoConf tajoConf)

        { + return partId % tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS); + }

        +
        + private Path getDataFile(ExecutionBlockId ebId, int partId) throws IOException {
        + try

        { + // the base dir for an output dir + String executionBlockBaseDir = ebId.getQueryId().toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle"; + Path baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(executionBlockBaseDir, systemConf)); + //LOG.info(ebId + "'s basedir is created (" + baseDirPath + ")"); + + // If EB has many partition, too many shuffle file are in single directory. + return StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, systemConf), "" + partId); + }

        catch (Exception e)

        { + LOG.error(e.getMessage(), e); + throw new IOException(e); + }

        + }
        +
        + public Path getPartitionAppenderDataFile(ExecutionBlockId ebId, int partId) {
        — End diff –

        It seems to be not used.

        Show
        githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on a diff in the pull request: https://github.com/apache/tajo/pull/115#discussion_r16338719 — Diff: tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java — @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.util.Pair; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class HashShuffleAppenderManager { + private static final Log LOG = LogFactory.getLog(HashShuffleAppenderManager.class); + + private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> appenderMap = + new ConcurrentHashMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>>(); + private TajoConf systemConf; + private FileSystem defaultFS; + private FileSystem localFS; + private LocalDirAllocator lDirAllocator; + private int pageSize; + + public HashShuffleAppenderManager(TajoConf systemConf) throws IOException { + this.systemConf = systemConf; + + // initialize LocalDirAllocator + lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); + + // initialize DFS and LocalFileSystems + defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); + localFS = FileSystem.getLocal(systemConf); + pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024; + } + + public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId, + TableMeta meta, Schema outSchema) throws IOException { + synchronized (appenderMap) { + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId); + + if (partitionAppenderMap == null) { + partitionAppenderMap = new ConcurrentHashMap<Integer, PartitionAppenderMeta>(); + appenderMap.put(ebId, partitionAppenderMap); + } + + PartitionAppenderMeta partitionAppenderMeta = partitionAppenderMap.get(partId); + if (partitionAppenderMeta == null) { + Path dataFile = getDataFile(ebId, partId); + FileSystem fs = dataFile.getFileSystem(systemConf); + if (fs.exists(dataFile)) { + FileStatus status = fs.getFileStatus(dataFile); + LOG.info("File " + dataFile + " already exists, size=" + status.getLen()); + } + + if (!fs.exists(dataFile.getParent())) { + fs.mkdirs(dataFile.getParent()); + } + FileAppender appender = (FileAppender) StorageManagerFactory.getStorageManager( + tajoConf).getAppender(meta, outSchema, dataFile); + appender.enableStats(); + appender.init(); + + partitionAppenderMeta = new PartitionAppenderMeta(); + partitionAppenderMeta.partId = partId; + partitionAppenderMeta.dataFile = dataFile; + partitionAppenderMeta.appender = new HashShuffleAppender(ebId, partId, pageSize, appender); + partitionAppenderMeta.appender.init(); + partitionAppenderMap.put(partId, partitionAppenderMeta); + + LOG.info("Create Hash shuffle file(partId=" + partId + "): " + dataFile); + } + + return partitionAppenderMeta.appender; + } + } + + public static int getPartParentId(int partId, TajoConf tajoConf) { + return partId % tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS); + } + + private Path getDataFile(ExecutionBlockId ebId, int partId) throws IOException { + try { + // the base dir for an output dir + String executionBlockBaseDir = ebId.getQueryId().toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle"; + Path baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(executionBlockBaseDir, systemConf)); + //LOG.info(ebId + "'s basedir is created (" + baseDirPath + ")"); + + // If EB has many partition, too many shuffle file are in single directory. + return StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, systemConf), "" + partId); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new IOException(e); + } + } + + public Path getPartitionAppenderDataFile(ExecutionBlockId ebId, int partId) { — End diff – It seems to be not used.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user hyunsik commented on a diff in the pull request:

        https://github.com/apache/tajo/pull/115#discussion_r16339612

        — Diff: tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java —
        @@ -0,0 +1,240 @@
        +/**
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.tajo.storage;
        +
        +import org.apache.commons.logging.Log;
        +import org.apache.commons.logging.LogFactory;
        +import org.apache.hadoop.fs.FileStatus;
        +import org.apache.hadoop.fs.FileSystem;
        +import org.apache.hadoop.fs.LocalDirAllocator;
        +import org.apache.hadoop.fs.Path;
        +import org.apache.tajo.ExecutionBlockId;
        +import org.apache.tajo.QueryUnitAttemptId;
        +import org.apache.tajo.catalog.Schema;
        +import org.apache.tajo.catalog.TableMeta;
        +import org.apache.tajo.conf.TajoConf;
        +import org.apache.tajo.conf.TajoConf.ConfVars;
        +import org.apache.tajo.util.Pair;
        +
        +import java.io.IOException;
        +import java.util.ArrayList;
        +import java.util.Collection;
        +import java.util.List;
        +import java.util.Map;
        +import java.util.concurrent.ConcurrentHashMap;
        +
        +public class HashShuffleAppenderManager {
        + private static final Log LOG = LogFactory.getLog(HashShuffleAppenderManager.class);
        +
        + private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> appenderMap =
        + new ConcurrentHashMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>>();
        + private TajoConf systemConf;
        + private FileSystem defaultFS;
        + private FileSystem localFS;
        + private LocalDirAllocator lDirAllocator;
        + private int pageSize;
        +
        + public HashShuffleAppenderManager(TajoConf systemConf) throws IOException

        { + this.systemConf = systemConf; + + // initialize LocalDirAllocator + lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); + + // initialize DFS and LocalFileSystems + defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); + localFS = FileSystem.getLocal(systemConf); + pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024; + }

        +
        + public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId,
        + TableMeta meta, Schema outSchema) throws IOException {
        + synchronized (appenderMap) {
        + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId);
        +
        + if (partitionAppenderMap == null)

        { + partitionAppenderMap = new ConcurrentHashMap<Integer, PartitionAppenderMeta>(); + appenderMap.put(ebId, partitionAppenderMap); + }

        +
        + PartitionAppenderMeta partitionAppenderMeta = partitionAppenderMap.get(partId);
        + if (partitionAppenderMeta == null) {
        + Path dataFile = getDataFile(ebId, partId);
        + FileSystem fs = dataFile.getFileSystem(systemConf);
        + if (fs.exists(dataFile))

        { + FileStatus status = fs.getFileStatus(dataFile); + LOG.info("File " + dataFile + " already exists, size=" + status.getLen()); + }

        +
        + if (!fs.exists(dataFile.getParent()))

        { + fs.mkdirs(dataFile.getParent()); + }

        + FileAppender appender = (FileAppender) StorageManagerFactory.getStorageManager(
        + tajoConf).getAppender(meta, outSchema, dataFile);
        + appender.enableStats();
        + appender.init();
        +
        + partitionAppenderMeta = new PartitionAppenderMeta();
        + partitionAppenderMeta.partId = partId;
        + partitionAppenderMeta.dataFile = dataFile;
        + partitionAppenderMeta.appender = new HashShuffleAppender(ebId, partId, pageSize, appender);
        + partitionAppenderMeta.appender.init();
        + partitionAppenderMap.put(partId, partitionAppenderMeta);
        +
        + LOG.info("Create Hash shuffle file(partId=" + partId + "): " + dataFile);
        + }
        +
        + return partitionAppenderMeta.appender;
        + }
        + }
        +
        + public static int getPartParentId(int partId, TajoConf tajoConf) {
        + return partId % tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS);
        — End diff –

        This method is invoked iteratively by getAppender(), but tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS) will be a fixed value. I propose changing it to a constant value initialized by constructor.

        Show
        githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on a diff in the pull request: https://github.com/apache/tajo/pull/115#discussion_r16339612 — Diff: tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java — @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.util.Pair; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class HashShuffleAppenderManager { + private static final Log LOG = LogFactory.getLog(HashShuffleAppenderManager.class); + + private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> appenderMap = + new ConcurrentHashMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>>(); + private TajoConf systemConf; + private FileSystem defaultFS; + private FileSystem localFS; + private LocalDirAllocator lDirAllocator; + private int pageSize; + + public HashShuffleAppenderManager(TajoConf systemConf) throws IOException { + this.systemConf = systemConf; + + // initialize LocalDirAllocator + lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); + + // initialize DFS and LocalFileSystems + defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); + localFS = FileSystem.getLocal(systemConf); + pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024; + } + + public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId, + TableMeta meta, Schema outSchema) throws IOException { + synchronized (appenderMap) { + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId); + + if (partitionAppenderMap == null) { + partitionAppenderMap = new ConcurrentHashMap<Integer, PartitionAppenderMeta>(); + appenderMap.put(ebId, partitionAppenderMap); + } + + PartitionAppenderMeta partitionAppenderMeta = partitionAppenderMap.get(partId); + if (partitionAppenderMeta == null) { + Path dataFile = getDataFile(ebId, partId); + FileSystem fs = dataFile.getFileSystem(systemConf); + if (fs.exists(dataFile)) { + FileStatus status = fs.getFileStatus(dataFile); + LOG.info("File " + dataFile + " already exists, size=" + status.getLen()); + } + + if (!fs.exists(dataFile.getParent())) { + fs.mkdirs(dataFile.getParent()); + } + FileAppender appender = (FileAppender) StorageManagerFactory.getStorageManager( + tajoConf).getAppender(meta, outSchema, dataFile); + appender.enableStats(); + appender.init(); + + partitionAppenderMeta = new PartitionAppenderMeta(); + partitionAppenderMeta.partId = partId; + partitionAppenderMeta.dataFile = dataFile; + partitionAppenderMeta.appender = new HashShuffleAppender(ebId, partId, pageSize, appender); + partitionAppenderMeta.appender.init(); + partitionAppenderMap.put(partId, partitionAppenderMeta); + + LOG.info("Create Hash shuffle file(partId=" + partId + "): " + dataFile); + } + + return partitionAppenderMeta.appender; + } + } + + public static int getPartParentId(int partId, TajoConf tajoConf) { + return partId % tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS); — End diff – This method is invoked iteratively by getAppender(), but tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS) will be a fixed value. I propose changing it to a constant value initialized by constructor.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user hyunsik commented on a diff in the pull request:

        https://github.com/apache/tajo/pull/115#discussion_r16339727

        — Diff: tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java —
        @@ -207,10 +208,13 @@ public void init(Configuration conf) {
        selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum);

        localFS = new LocalFileSystem();

        • super.init(new Configuration(conf));
          + //super.init(new Configuration(conf));
            • End diff –

        Please remove the commented out line.

        Show
        githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on a diff in the pull request: https://github.com/apache/tajo/pull/115#discussion_r16339727 — Diff: tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java — @@ -207,10 +208,13 @@ public void init(Configuration conf) { selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum); localFS = new LocalFileSystem(); super.init(new Configuration(conf)); + //super.init(new Configuration(conf)); End diff – Please remove the commented out line.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user hyunsik commented on a diff in the pull request:

        https://github.com/apache/tajo/pull/115#discussion_r16339811

        — Diff: tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java —
        @@ -0,0 +1,240 @@
        +/**
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.tajo.storage;
        +
        +import org.apache.commons.logging.Log;
        +import org.apache.commons.logging.LogFactory;
        +import org.apache.hadoop.fs.FileStatus;
        +import org.apache.hadoop.fs.FileSystem;
        +import org.apache.hadoop.fs.LocalDirAllocator;
        +import org.apache.hadoop.fs.Path;
        +import org.apache.tajo.ExecutionBlockId;
        +import org.apache.tajo.QueryUnitAttemptId;
        +import org.apache.tajo.catalog.Schema;
        +import org.apache.tajo.catalog.TableMeta;
        +import org.apache.tajo.conf.TajoConf;
        +import org.apache.tajo.conf.TajoConf.ConfVars;
        +import org.apache.tajo.util.Pair;
        +
        +import java.io.IOException;
        +import java.util.ArrayList;
        +import java.util.Collection;
        +import java.util.List;
        +import java.util.Map;
        +import java.util.concurrent.ConcurrentHashMap;
        +
        +public class HashShuffleAppenderManager {
        + private static final Log LOG = LogFactory.getLog(HashShuffleAppenderManager.class);
        +
        + private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> appenderMap =
        + new ConcurrentHashMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>>();
        + private TajoConf systemConf;
        + private FileSystem defaultFS;
        + private FileSystem localFS;
        + private LocalDirAllocator lDirAllocator;
        + private int pageSize;
        +
        + public HashShuffleAppenderManager(TajoConf systemConf) throws IOException

        { + this.systemConf = systemConf; + + // initialize LocalDirAllocator + lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); + + // initialize DFS and LocalFileSystems + defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); + localFS = FileSystem.getLocal(systemConf); + pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024; + }

        +
        + public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId,
        + TableMeta meta, Schema outSchema) throws IOException {
        + synchronized (appenderMap) {
        + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId);
        +
        + if (partitionAppenderMap == null)

        { + partitionAppenderMap = new ConcurrentHashMap<Integer, PartitionAppenderMeta>(); + appenderMap.put(ebId, partitionAppenderMap); + }

        +
        + PartitionAppenderMeta partitionAppenderMeta = partitionAppenderMap.get(partId);
        + if (partitionAppenderMeta == null) {
        + Path dataFile = getDataFile(ebId, partId);
        + FileSystem fs = dataFile.getFileSystem(systemConf);
        + if (fs.exists(dataFile))

        { + FileStatus status = fs.getFileStatus(dataFile); + LOG.info("File " + dataFile + " already exists, size=" + status.getLen()); + }

        +
        + if (!fs.exists(dataFile.getParent()))

        { + fs.mkdirs(dataFile.getParent()); + }

        + FileAppender appender = (FileAppender) StorageManagerFactory.getStorageManager(
        + tajoConf).getAppender(meta, outSchema, dataFile);
        + appender.enableStats();
        + appender.init();
        +
        + partitionAppenderMeta = new PartitionAppenderMeta();
        + partitionAppenderMeta.partId = partId;
        + partitionAppenderMeta.dataFile = dataFile;
        + partitionAppenderMeta.appender = new HashShuffleAppender(ebId, partId, pageSize, appender);
        + partitionAppenderMeta.appender.init();
        + partitionAppenderMap.put(partId, partitionAppenderMeta);
        +
        + LOG.info("Create Hash shuffle file(partId=" + partId + "): " + dataFile);
        + }
        +
        + return partitionAppenderMeta.appender;
        + }
        + }
        +
        + public static int getPartParentId(int partId, TajoConf tajoConf)

        { + return partId % tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS); + }

        +
        + private Path getDataFile(ExecutionBlockId ebId, int partId) throws IOException {
        + try

        { + // the base dir for an output dir + String executionBlockBaseDir = ebId.getQueryId().toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle"; + Path baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(executionBlockBaseDir, systemConf)); + //LOG.info(ebId + "'s basedir is created (" + baseDirPath + ")"); + + // If EB has many partition, too many shuffle file are in single directory. + return StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, systemConf), "" + partId); + }

        catch (Exception e)

        { + LOG.error(e.getMessage(), e); + throw new IOException(e); + }

        + }
        +
        + public Path getPartitionAppenderDataFile(ExecutionBlockId ebId, int partId) {
        + synchronized (appenderMap) {
        + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId);
        + if (partitionAppenderMap != null) {
        + PartitionAppenderMeta meta = partitionAppenderMap.get(partId);
        + if (meta != null)

        { + return meta.dataFile; + }

        + }
        + }
        +
        + LOG.warn("Can't find HashShuffleAppender:" + ebId + ", part=" + partId);
        + return null;
        + }
        +
        + public List<HashShuffleIntermediate> close(ExecutionBlockId ebId) throws IOException {
        + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = null;
        + synchronized (appenderMap)

        { + partitionAppenderMap = appenderMap.remove(ebId); + }

        +
        + if (partitionAppenderMap == null)

        { + LOG.info("Close HashShuffleAppender:" + ebId + ", not a hash shuffle"); + return null; + }

        +
        + // Send Intermediate data to QueryMaster.
        + List<HashShuffleIntermediate> intermEntries = new ArrayList<HashShuffleIntermediate>();
        + for (PartitionAppenderMeta eachMeta : partitionAppenderMap.values()) {
        + try

        { + eachMeta.appender.close(); + HashShuffleIntermediate intermediate = + new HashShuffleIntermediate(eachMeta.partId, eachMeta.appender.getOffset(), + eachMeta.appender.getPages(), + eachMeta.appender.getMergedTupleIndexes()); + intermEntries.add(intermediate); + }

        catch (IOException e)

        { + LOG.error(e.getMessage(), e); + throw e; + }

        + }
        +
        + LOG.info("Close HashShuffleAppender:" + ebId + ", intermediates=" + intermEntries.size());
        +
        + return intermEntries;
        + }
        +
        + public void taskFinished(QueryUnitAttemptId taskId) {
        — End diff –

        In my view, finalizeTask or cleanupTask is more proper name for this purpose.

        Show
        githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on a diff in the pull request: https://github.com/apache/tajo/pull/115#discussion_r16339811 — Diff: tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java — @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.util.Pair; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class HashShuffleAppenderManager { + private static final Log LOG = LogFactory.getLog(HashShuffleAppenderManager.class); + + private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> appenderMap = + new ConcurrentHashMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>>(); + private TajoConf systemConf; + private FileSystem defaultFS; + private FileSystem localFS; + private LocalDirAllocator lDirAllocator; + private int pageSize; + + public HashShuffleAppenderManager(TajoConf systemConf) throws IOException { + this.systemConf = systemConf; + + // initialize LocalDirAllocator + lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); + + // initialize DFS and LocalFileSystems + defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); + localFS = FileSystem.getLocal(systemConf); + pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024; + } + + public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId, + TableMeta meta, Schema outSchema) throws IOException { + synchronized (appenderMap) { + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId); + + if (partitionAppenderMap == null) { + partitionAppenderMap = new ConcurrentHashMap<Integer, PartitionAppenderMeta>(); + appenderMap.put(ebId, partitionAppenderMap); + } + + PartitionAppenderMeta partitionAppenderMeta = partitionAppenderMap.get(partId); + if (partitionAppenderMeta == null) { + Path dataFile = getDataFile(ebId, partId); + FileSystem fs = dataFile.getFileSystem(systemConf); + if (fs.exists(dataFile)) { + FileStatus status = fs.getFileStatus(dataFile); + LOG.info("File " + dataFile + " already exists, size=" + status.getLen()); + } + + if (!fs.exists(dataFile.getParent())) { + fs.mkdirs(dataFile.getParent()); + } + FileAppender appender = (FileAppender) StorageManagerFactory.getStorageManager( + tajoConf).getAppender(meta, outSchema, dataFile); + appender.enableStats(); + appender.init(); + + partitionAppenderMeta = new PartitionAppenderMeta(); + partitionAppenderMeta.partId = partId; + partitionAppenderMeta.dataFile = dataFile; + partitionAppenderMeta.appender = new HashShuffleAppender(ebId, partId, pageSize, appender); + partitionAppenderMeta.appender.init(); + partitionAppenderMap.put(partId, partitionAppenderMeta); + + LOG.info("Create Hash shuffle file(partId=" + partId + "): " + dataFile); + } + + return partitionAppenderMeta.appender; + } + } + + public static int getPartParentId(int partId, TajoConf tajoConf) { + return partId % tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS); + } + + private Path getDataFile(ExecutionBlockId ebId, int partId) throws IOException { + try { + // the base dir for an output dir + String executionBlockBaseDir = ebId.getQueryId().toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle"; + Path baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(executionBlockBaseDir, systemConf)); + //LOG.info(ebId + "'s basedir is created (" + baseDirPath + ")"); + + // If EB has many partition, too many shuffle file are in single directory. + return StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, systemConf), "" + partId); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new IOException(e); + } + } + + public Path getPartitionAppenderDataFile(ExecutionBlockId ebId, int partId) { + synchronized (appenderMap) { + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId); + if (partitionAppenderMap != null) { + PartitionAppenderMeta meta = partitionAppenderMap.get(partId); + if (meta != null) { + return meta.dataFile; + } + } + } + + LOG.warn("Can't find HashShuffleAppender:" + ebId + ", part=" + partId); + return null; + } + + public List<HashShuffleIntermediate> close(ExecutionBlockId ebId) throws IOException { + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = null; + synchronized (appenderMap) { + partitionAppenderMap = appenderMap.remove(ebId); + } + + if (partitionAppenderMap == null) { + LOG.info("Close HashShuffleAppender:" + ebId + ", not a hash shuffle"); + return null; + } + + // Send Intermediate data to QueryMaster. + List<HashShuffleIntermediate> intermEntries = new ArrayList<HashShuffleIntermediate>(); + for (PartitionAppenderMeta eachMeta : partitionAppenderMap.values()) { + try { + eachMeta.appender.close(); + HashShuffleIntermediate intermediate = + new HashShuffleIntermediate(eachMeta.partId, eachMeta.appender.getOffset(), + eachMeta.appender.getPages(), + eachMeta.appender.getMergedTupleIndexes()); + intermEntries.add(intermediate); + } catch (IOException e) { + LOG.error(e.getMessage(), e); + throw e; + } + } + + LOG.info("Close HashShuffleAppender:" + ebId + ", intermediates=" + intermEntries.size()); + + return intermEntries; + } + + public void taskFinished(QueryUnitAttemptId taskId) { — End diff – In my view, finalizeTask or cleanupTask is more proper name for this purpose.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user hyunsik commented on a diff in the pull request:

        https://github.com/apache/tajo/pull/115#discussion_r16339900

        — Diff: tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java —
        @@ -0,0 +1,204 @@
        +/**
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.tajo.storage;
        +
        +import org.apache.commons.logging.Log;
        +import org.apache.commons.logging.LogFactory;
        +import org.apache.tajo.ExecutionBlockId;
        +import org.apache.tajo.QueryUnitAttemptId;
        +import org.apache.tajo.catalog.statistics.TableStats;
        +import org.apache.tajo.util.Pair;
        +
        +import java.io.IOException;
        +import java.util.ArrayList;
        +import java.util.HashMap;
        +import java.util.List;
        +import java.util.Map;
        +import java.util.concurrent.atomic.AtomicBoolean;
        +
        +public class HashShuffleAppender implements Appender {
        + private static Log LOG = LogFactory.getLog(HashShuffleAppender.class);
        +
        + private FileAppender appender;
        + private AtomicBoolean closed = new AtomicBoolean(false);
        + private int partId;
        +
        + private TableStats tableStats;
        +
        + //<taskId,<page start offset,<task start, task end>>>
        + private Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> taskTupleIndexes;
        +
        + //page start offset, length
        + private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
        +
        + private Pair<Long, Integer> currentPage;
        +
        + private int pageSize; //MB
        +
        + private int rowNumInPage;
        +
        + private int totalRows;
        +
        + private long offset;
        +
        + private ExecutionBlockId ebId;
        +
        + public HashShuffleAppender(ExecutionBlockId ebId, int partId, int pageSize, FileAppender appender)

        { + this.ebId = ebId; + this.partId = partId; + this.appender = appender; + this.pageSize = pageSize; + }

        +
        + @Override
        + public void init() throws IOException

        { + currentPage = new Pair(0L, 0); + taskTupleIndexes = new HashMap<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>>(); + rowNumInPage = 0; + }

        +
        + /**
        + * Write multiple tuples. Each tuple is written by a FileAppender which is responsible specified partition.
        + * After writing if a current page exceeds pageSize, pageOffset will be added.
        + * @param taskId
        + * @param tuples
        + * @return written bytes
        + * @throws IOException
        + */
        + public int addTuples(QueryUnitAttemptId taskId, List<Tuple> tuples) throws IOException {
        + synchronized(appender) {
        + if (closed.get())

        { + return 0; + }

        + long currentPos = appender.getOffset();
        +
        + for (Tuple eachTuple: tuples)

        { + appender.addTuple(eachTuple); + }

        + long posAfterWritten = appender.getOffset();
        +
        + int writtenBytes = (int)(posAfterWritten - currentPos);
        +
        + int nextRowNum = rowNumInPage + tuples.size();
        + List<Pair<Long, Pair<Integer, Integer>>> taskIndexes = taskTupleIndexes.get(taskId);
        + if (taskIndexes == null)

        { + taskIndexes = new ArrayList<Pair<Long, Pair<Integer, Integer>>>(); + taskTupleIndexes.put(taskId, taskIndexes); + }

        + taskIndexes.add(
        + new Pair<Long, Pair<Integer, Integer>>(currentPage.getFirst(), new Pair(rowNumInPage, nextRowNum)));
        + rowNumInPage = nextRowNum;
        +
        + if (posAfterWritten - currentPage.getFirst() > pageSize)

        { + nextPage(posAfterWritten); + rowNumInPage = 0; + }

        +
        + totalRows += tuples.size();
        + return writtenBytes;
        + }
        + }
        +
        + public long getOffset() throws IOException {
        + if (closed.get())

        { + return offset; + }

        else

        { + return appender.getOffset(); + }

        + }
        +
        + private void nextPage(long pos)

        { + currentPage.setSecond((int) (pos - currentPage.getFirst())); + pages.add(currentPage); + currentPage = new Pair(pos, 0); + }

        +
        + @Override
        + public void addTuple(Tuple t) throws IOException

        { + throw new IOException("Not support addTuple, use addTuples()"); + }

        +
        + @Override
        + public void flush() throws IOException {
        + synchronized(appender) {
        + if (closed.get())

        { + return; + }
        + appender.flush();
        + }
        + }
        +
        + @Override
        + public void close() throws IOException {
        + synchronized(appender) {
        + if (closed.get()) { + return; + }

        + appender.flush();
        + offset = appender.getOffset();
        + if (offset > currentPage.getFirst())

        { + nextPage(offset); + }

        + appender.close();
        + if (LOG.isDebugEnabled()) {
        + if (!pages.isEmpty())

        { + LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size() + + ", lastPage=" + pages.get(pages.size() - 1)); + }

        else

        { + LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size()); + }

        + }
        + closed.set(true);
        + tableStats = appender.getStats();
        + }
        + }
        +
        + @Override
        + public void enableStats()

        { + }

        +
        + @Override
        + public TableStats getStats() {
        + synchronized(appender)

        { + return appender.getStats(); + }

        + }
        +
        + public List<Pair<Long, Integer>> getPages()

        { + return pages; + }

        +
        + public Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> getTaskTupleIndexes()

        { + return taskTupleIndexes; + }

        +
        + public List<Pair<Long, Pair<Integer, Integer>>> getMergedTupleIndexes() {
        + List<Pair<Long, Pair<Integer, Integer>>> merged = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
        +
        + for (List<Pair<Long, Pair<Integer, Integer>>> eachFailureIndex: taskTupleIndexes.values())

        { + merged.addAll(eachFailureIndex); + }

        +
        + return merged;
        + }
        +
        + public void taskFinished(QueryUnitAttemptId taskId) {
        — End diff –

        In my view, finishTask or finializeTask may be more proper name for this purpose.

        Show
        githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on a diff in the pull request: https://github.com/apache/tajo/pull/115#discussion_r16339900 — Diff: tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java — @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.util.Pair; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class HashShuffleAppender implements Appender { + private static Log LOG = LogFactory.getLog(HashShuffleAppender.class); + + private FileAppender appender; + private AtomicBoolean closed = new AtomicBoolean(false); + private int partId; + + private TableStats tableStats; + + //<taskId,<page start offset,<task start, task end>>> + private Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> taskTupleIndexes; + + //page start offset, length + private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>(); + + private Pair<Long, Integer> currentPage; + + private int pageSize; //MB + + private int rowNumInPage; + + private int totalRows; + + private long offset; + + private ExecutionBlockId ebId; + + public HashShuffleAppender(ExecutionBlockId ebId, int partId, int pageSize, FileAppender appender) { + this.ebId = ebId; + this.partId = partId; + this.appender = appender; + this.pageSize = pageSize; + } + + @Override + public void init() throws IOException { + currentPage = new Pair(0L, 0); + taskTupleIndexes = new HashMap<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>>(); + rowNumInPage = 0; + } + + /** + * Write multiple tuples. Each tuple is written by a FileAppender which is responsible specified partition. + * After writing if a current page exceeds pageSize, pageOffset will be added. + * @param taskId + * @param tuples + * @return written bytes + * @throws IOException + */ + public int addTuples(QueryUnitAttemptId taskId, List<Tuple> tuples) throws IOException { + synchronized(appender) { + if (closed.get()) { + return 0; + } + long currentPos = appender.getOffset(); + + for (Tuple eachTuple: tuples) { + appender.addTuple(eachTuple); + } + long posAfterWritten = appender.getOffset(); + + int writtenBytes = (int)(posAfterWritten - currentPos); + + int nextRowNum = rowNumInPage + tuples.size(); + List<Pair<Long, Pair<Integer, Integer>>> taskIndexes = taskTupleIndexes.get(taskId); + if (taskIndexes == null) { + taskIndexes = new ArrayList<Pair<Long, Pair<Integer, Integer>>>(); + taskTupleIndexes.put(taskId, taskIndexes); + } + taskIndexes.add( + new Pair<Long, Pair<Integer, Integer>>(currentPage.getFirst(), new Pair(rowNumInPage, nextRowNum))); + rowNumInPage = nextRowNum; + + if (posAfterWritten - currentPage.getFirst() > pageSize) { + nextPage(posAfterWritten); + rowNumInPage = 0; + } + + totalRows += tuples.size(); + return writtenBytes; + } + } + + public long getOffset() throws IOException { + if (closed.get()) { + return offset; + } else { + return appender.getOffset(); + } + } + + private void nextPage(long pos) { + currentPage.setSecond((int) (pos - currentPage.getFirst())); + pages.add(currentPage); + currentPage = new Pair(pos, 0); + } + + @Override + public void addTuple(Tuple t) throws IOException { + throw new IOException("Not support addTuple, use addTuples()"); + } + + @Override + public void flush() throws IOException { + synchronized(appender) { + if (closed.get()) { + return; + } + appender.flush(); + } + } + + @Override + public void close() throws IOException { + synchronized(appender) { + if (closed.get()) { + return; + } + appender.flush(); + offset = appender.getOffset(); + if (offset > currentPage.getFirst()) { + nextPage(offset); + } + appender.close(); + if (LOG.isDebugEnabled()) { + if (!pages.isEmpty()) { + LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size() + + ", lastPage=" + pages.get(pages.size() - 1)); + } else { + LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size()); + } + } + closed.set(true); + tableStats = appender.getStats(); + } + } + + @Override + public void enableStats() { + } + + @Override + public TableStats getStats() { + synchronized(appender) { + return appender.getStats(); + } + } + + public List<Pair<Long, Integer>> getPages() { + return pages; + } + + public Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> getTaskTupleIndexes() { + return taskTupleIndexes; + } + + public List<Pair<Long, Pair<Integer, Integer>>> getMergedTupleIndexes() { + List<Pair<Long, Pair<Integer, Integer>>> merged = new ArrayList<Pair<Long, Pair<Integer, Integer>>>(); + + for (List<Pair<Long, Pair<Integer, Integer>>> eachFailureIndex: taskTupleIndexes.values()) { + merged.addAll(eachFailureIndex); + } + + return merged; + } + + public void taskFinished(QueryUnitAttemptId taskId) { — End diff – In my view, finishTask or finializeTask may be more proper name for this purpose.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user babokim commented on a diff in the pull request:

        https://github.com/apache/tajo/pull/115#discussion_r16339935

        — Diff: tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java —
        @@ -923,20 +960,62 @@ public static void scheduleScatteredHashShuffleFetches(TaskSchedulerContext sche

        LOG.info(subQuery.getId()
        + ", ShuffleType:" + SCATTERED_HASH_SHUFFLE.name()

        • + ", DeterminedTaskNum : " + fetches.size());
          + + ", Intermediate Size: " + totalIntermediateSize
          + + ", splitSize: " + splitVolume
          + + ", DeterminedTaskNum: " + fetches.size());
          }
        • static class IntermediateEntryComparator implements Comparator<IntermediateEntry> {
          + /**
          + * If a IntermediateEntry is large than splitVolume, List<FetchImpl> has single element.
          + * @param ebId
          + * @param entries
          + * @param splitVolume
          + * @return
          + */
          + public static List<List<FetchImpl>> splitOrMergeIntermediates(
          + ExecutionBlockId ebId, List<IntermediateEntry> entries, long splitVolume, long pageSize) {
          + // Each List<FetchImpl> has splitVolume size.
          + List<List<FetchImpl>> fetches = new ArrayList<List<FetchImpl>>();
          +
          + Iterator<IntermediateEntry> iter = entries.iterator();
          + if (!iter.hasNext()) { + return null; + }

          + List<FetchImpl> fetchListForSingleTask = new ArrayList<FetchImpl>();
          + long fetchListVolume = 0;

        • @Override
        • public int compare(IntermediateEntry o1, IntermediateEntry o2) {
        • int cmp = Ints.compare(o1.getPartId(), o2.getPartId());
        • if (cmp != 0) {
        • return cmp;
          + while (iter.hasNext()) {
          + IntermediateEntry currentInterm = iter.next();
          +
          + long firstSplitVolume = splitVolume - fetchListVolume;
          + if (firstSplitVolume < pageSize) { + firstSplitVolume = splitVolume; + }

          + List<Pair<Long, Long>> splits = currentInterm.split(firstSplitVolume, splitVolume);

            • End diff –

        Each Pair object in the splits variable is assigned to the next ExectionBlock's task.
        The first long value is a offset of the intermediate file and the second long value is length.

        Show
        githubbot ASF GitHub Bot added a comment - Github user babokim commented on a diff in the pull request: https://github.com/apache/tajo/pull/115#discussion_r16339935 — Diff: tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java — @@ -923,20 +960,62 @@ public static void scheduleScatteredHashShuffleFetches(TaskSchedulerContext sche LOG.info(subQuery.getId() + ", ShuffleType:" + SCATTERED_HASH_SHUFFLE.name() + ", DeterminedTaskNum : " + fetches.size()); + + ", Intermediate Size: " + totalIntermediateSize + + ", splitSize: " + splitVolume + + ", DeterminedTaskNum: " + fetches.size()); } static class IntermediateEntryComparator implements Comparator<IntermediateEntry> { + /** + * If a IntermediateEntry is large than splitVolume, List<FetchImpl> has single element. + * @param ebId + * @param entries + * @param splitVolume + * @return + */ + public static List<List<FetchImpl>> splitOrMergeIntermediates( + ExecutionBlockId ebId, List<IntermediateEntry> entries, long splitVolume, long pageSize) { + // Each List<FetchImpl> has splitVolume size. + List<List<FetchImpl>> fetches = new ArrayList<List<FetchImpl>>(); + + Iterator<IntermediateEntry> iter = entries.iterator(); + if (!iter.hasNext()) { + return null; + } + List<FetchImpl> fetchListForSingleTask = new ArrayList<FetchImpl>(); + long fetchListVolume = 0; @Override public int compare(IntermediateEntry o1, IntermediateEntry o2) { int cmp = Ints.compare(o1.getPartId(), o2.getPartId()); if (cmp != 0) { return cmp; + while (iter.hasNext()) { + IntermediateEntry currentInterm = iter.next(); + + long firstSplitVolume = splitVolume - fetchListVolume; + if (firstSplitVolume < pageSize) { + firstSplitVolume = splitVolume; + } + List<Pair<Long, Long>> splits = currentInterm.split(firstSplitVolume, splitVolume); End diff – Each Pair object in the splits variable is assigned to the next ExectionBlock's task. The first long value is a offset of the intermediate file and the second long value is length.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user babokim commented on a diff in the pull request:

        https://github.com/apache/tajo/pull/115#discussion_r16339947

        — Diff: tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java —
        @@ -0,0 +1,240 @@
        +/**
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.tajo.storage;
        +
        +import org.apache.commons.logging.Log;
        +import org.apache.commons.logging.LogFactory;
        +import org.apache.hadoop.fs.FileStatus;
        +import org.apache.hadoop.fs.FileSystem;
        +import org.apache.hadoop.fs.LocalDirAllocator;
        +import org.apache.hadoop.fs.Path;
        +import org.apache.tajo.ExecutionBlockId;
        +import org.apache.tajo.QueryUnitAttemptId;
        +import org.apache.tajo.catalog.Schema;
        +import org.apache.tajo.catalog.TableMeta;
        +import org.apache.tajo.conf.TajoConf;
        +import org.apache.tajo.conf.TajoConf.ConfVars;
        +import org.apache.tajo.util.Pair;
        +
        +import java.io.IOException;
        +import java.util.ArrayList;
        +import java.util.Collection;
        +import java.util.List;
        +import java.util.Map;
        +import java.util.concurrent.ConcurrentHashMap;
        +
        +public class HashShuffleAppenderManager {
        + private static final Log LOG = LogFactory.getLog(HashShuffleAppenderManager.class);
        +
        + private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> appenderMap =
        + new ConcurrentHashMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>>();
        + private TajoConf systemConf;
        + private FileSystem defaultFS;
        + private FileSystem localFS;
        + private LocalDirAllocator lDirAllocator;
        + private int pageSize;
        +
        + public HashShuffleAppenderManager(TajoConf systemConf) throws IOException

        { + this.systemConf = systemConf; + + // initialize LocalDirAllocator + lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); + + // initialize DFS and LocalFileSystems + defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); + localFS = FileSystem.getLocal(systemConf); + pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024; + }

        +
        + public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId,
        + TableMeta meta, Schema outSchema) throws IOException {
        + synchronized (appenderMap) {
        + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId);
        +
        + if (partitionAppenderMap == null)

        { + partitionAppenderMap = new ConcurrentHashMap<Integer, PartitionAppenderMeta>(); + appenderMap.put(ebId, partitionAppenderMap); + }

        +
        + PartitionAppenderMeta partitionAppenderMeta = partitionAppenderMap.get(partId);
        + if (partitionAppenderMeta == null) {
        + Path dataFile = getDataFile(ebId, partId);
        + FileSystem fs = dataFile.getFileSystem(systemConf);
        + if (fs.exists(dataFile))

        { + FileStatus status = fs.getFileStatus(dataFile); + LOG.info("File " + dataFile + " already exists, size=" + status.getLen()); + }

        +
        + if (!fs.exists(dataFile.getParent()))

        { + fs.mkdirs(dataFile.getParent()); + }

        + FileAppender appender = (FileAppender) StorageManagerFactory.getStorageManager(
        + tajoConf).getAppender(meta, outSchema, dataFile);
        + appender.enableStats();
        + appender.init();
        +
        + partitionAppenderMeta = new PartitionAppenderMeta();
        + partitionAppenderMeta.partId = partId;
        + partitionAppenderMeta.dataFile = dataFile;
        + partitionAppenderMeta.appender = new HashShuffleAppender(ebId, partId, pageSize, appender);
        + partitionAppenderMeta.appender.init();
        + partitionAppenderMap.put(partId, partitionAppenderMeta);
        +
        + LOG.info("Create Hash shuffle file(partId=" + partId + "): " + dataFile);
        + }
        +
        + return partitionAppenderMeta.appender;
        + }
        + }
        +
        + public static int getPartParentId(int partId, TajoConf tajoConf)

        { + return partId % tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS); + }

        +
        + private Path getDataFile(ExecutionBlockId ebId, int partId) throws IOException {
        + try

        { + // the base dir for an output dir + String executionBlockBaseDir = ebId.getQueryId().toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle"; + Path baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(executionBlockBaseDir, systemConf)); + //LOG.info(ebId + "'s basedir is created (" + baseDirPath + ")"); + + // If EB has many partition, too many shuffle file are in single directory. + return StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, systemConf), "" + partId); + }

        catch (Exception e)

        { + LOG.error(e.getMessage(), e); + throw new IOException(e); + }

        + }
        +
        + public Path getPartitionAppenderDataFile(ExecutionBlockId ebId, int partId) {
        — End diff –

        Yes.

        Show
        githubbot ASF GitHub Bot added a comment - Github user babokim commented on a diff in the pull request: https://github.com/apache/tajo/pull/115#discussion_r16339947 — Diff: tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java — @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.util.Pair; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class HashShuffleAppenderManager { + private static final Log LOG = LogFactory.getLog(HashShuffleAppenderManager.class); + + private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> appenderMap = + new ConcurrentHashMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>>(); + private TajoConf systemConf; + private FileSystem defaultFS; + private FileSystem localFS; + private LocalDirAllocator lDirAllocator; + private int pageSize; + + public HashShuffleAppenderManager(TajoConf systemConf) throws IOException { + this.systemConf = systemConf; + + // initialize LocalDirAllocator + lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); + + // initialize DFS and LocalFileSystems + defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); + localFS = FileSystem.getLocal(systemConf); + pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024; + } + + public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId, + TableMeta meta, Schema outSchema) throws IOException { + synchronized (appenderMap) { + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId); + + if (partitionAppenderMap == null) { + partitionAppenderMap = new ConcurrentHashMap<Integer, PartitionAppenderMeta>(); + appenderMap.put(ebId, partitionAppenderMap); + } + + PartitionAppenderMeta partitionAppenderMeta = partitionAppenderMap.get(partId); + if (partitionAppenderMeta == null) { + Path dataFile = getDataFile(ebId, partId); + FileSystem fs = dataFile.getFileSystem(systemConf); + if (fs.exists(dataFile)) { + FileStatus status = fs.getFileStatus(dataFile); + LOG.info("File " + dataFile + " already exists, size=" + status.getLen()); + } + + if (!fs.exists(dataFile.getParent())) { + fs.mkdirs(dataFile.getParent()); + } + FileAppender appender = (FileAppender) StorageManagerFactory.getStorageManager( + tajoConf).getAppender(meta, outSchema, dataFile); + appender.enableStats(); + appender.init(); + + partitionAppenderMeta = new PartitionAppenderMeta(); + partitionAppenderMeta.partId = partId; + partitionAppenderMeta.dataFile = dataFile; + partitionAppenderMeta.appender = new HashShuffleAppender(ebId, partId, pageSize, appender); + partitionAppenderMeta.appender.init(); + partitionAppenderMap.put(partId, partitionAppenderMeta); + + LOG.info("Create Hash shuffle file(partId=" + partId + "): " + dataFile); + } + + return partitionAppenderMeta.appender; + } + } + + public static int getPartParentId(int partId, TajoConf tajoConf) { + return partId % tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS); + } + + private Path getDataFile(ExecutionBlockId ebId, int partId) throws IOException { + try { + // the base dir for an output dir + String executionBlockBaseDir = ebId.getQueryId().toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle"; + Path baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(executionBlockBaseDir, systemConf)); + //LOG.info(ebId + "'s basedir is created (" + baseDirPath + ")"); + + // If EB has many partition, too many shuffle file are in single directory. + return StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, systemConf), "" + partId); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new IOException(e); + } + } + + public Path getPartitionAppenderDataFile(ExecutionBlockId ebId, int partId) { — End diff – Yes.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user babokim commented on a diff in the pull request:

        https://github.com/apache/tajo/pull/115#discussion_r16340019

        — Diff: tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java —
        @@ -0,0 +1,240 @@
        +/**
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.tajo.storage;
        +
        +import org.apache.commons.logging.Log;
        +import org.apache.commons.logging.LogFactory;
        +import org.apache.hadoop.fs.FileStatus;
        +import org.apache.hadoop.fs.FileSystem;
        +import org.apache.hadoop.fs.LocalDirAllocator;
        +import org.apache.hadoop.fs.Path;
        +import org.apache.tajo.ExecutionBlockId;
        +import org.apache.tajo.QueryUnitAttemptId;
        +import org.apache.tajo.catalog.Schema;
        +import org.apache.tajo.catalog.TableMeta;
        +import org.apache.tajo.conf.TajoConf;
        +import org.apache.tajo.conf.TajoConf.ConfVars;
        +import org.apache.tajo.util.Pair;
        +
        +import java.io.IOException;
        +import java.util.ArrayList;
        +import java.util.Collection;
        +import java.util.List;
        +import java.util.Map;
        +import java.util.concurrent.ConcurrentHashMap;
        +
        +public class HashShuffleAppenderManager {
        + private static final Log LOG = LogFactory.getLog(HashShuffleAppenderManager.class);
        +
        + private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> appenderMap =
        + new ConcurrentHashMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>>();
        + private TajoConf systemConf;
        + private FileSystem defaultFS;
        + private FileSystem localFS;
        + private LocalDirAllocator lDirAllocator;
        + private int pageSize;
        +
        + public HashShuffleAppenderManager(TajoConf systemConf) throws IOException

        { + this.systemConf = systemConf; + + // initialize LocalDirAllocator + lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); + + // initialize DFS and LocalFileSystems + defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); + localFS = FileSystem.getLocal(systemConf); + pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024; + }

        +
        + public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId,
        + TableMeta meta, Schema outSchema) throws IOException {
        + synchronized (appenderMap) {
        + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId);
        +
        + if (partitionAppenderMap == null)

        { + partitionAppenderMap = new ConcurrentHashMap<Integer, PartitionAppenderMeta>(); + appenderMap.put(ebId, partitionAppenderMap); + }

        +
        + PartitionAppenderMeta partitionAppenderMeta = partitionAppenderMap.get(partId);
        + if (partitionAppenderMeta == null) {
        + Path dataFile = getDataFile(ebId, partId);
        + FileSystem fs = dataFile.getFileSystem(systemConf);
        + if (fs.exists(dataFile))

        { + FileStatus status = fs.getFileStatus(dataFile); + LOG.info("File " + dataFile + " already exists, size=" + status.getLen()); + }

        +
        + if (!fs.exists(dataFile.getParent()))

        { + fs.mkdirs(dataFile.getParent()); + }

        + FileAppender appender = (FileAppender) StorageManagerFactory.getStorageManager(
        + tajoConf).getAppender(meta, outSchema, dataFile);
        + appender.enableStats();
        + appender.init();
        +
        + partitionAppenderMeta = new PartitionAppenderMeta();
        + partitionAppenderMeta.partId = partId;
        + partitionAppenderMeta.dataFile = dataFile;
        + partitionAppenderMeta.appender = new HashShuffleAppender(ebId, partId, pageSize, appender);
        + partitionAppenderMeta.appender.init();
        + partitionAppenderMap.put(partId, partitionAppenderMeta);
        +
        + LOG.info("Create Hash shuffle file(partId=" + partId + "): " + dataFile);
        + }
        +
        + return partitionAppenderMeta.appender;
        + }
        + }
        +
        + public static int getPartParentId(int partId, TajoConf tajoConf) {
        + return partId % tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS);
        — End diff –

        getPartParentId() method is used by TajoPullServerService.

        Show
        githubbot ASF GitHub Bot added a comment - Github user babokim commented on a diff in the pull request: https://github.com/apache/tajo/pull/115#discussion_r16340019 — Diff: tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java — @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.util.Pair; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class HashShuffleAppenderManager { + private static final Log LOG = LogFactory.getLog(HashShuffleAppenderManager.class); + + private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> appenderMap = + new ConcurrentHashMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>>(); + private TajoConf systemConf; + private FileSystem defaultFS; + private FileSystem localFS; + private LocalDirAllocator lDirAllocator; + private int pageSize; + + public HashShuffleAppenderManager(TajoConf systemConf) throws IOException { + this.systemConf = systemConf; + + // initialize LocalDirAllocator + lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); + + // initialize DFS and LocalFileSystems + defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); + localFS = FileSystem.getLocal(systemConf); + pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024; + } + + public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId, + TableMeta meta, Schema outSchema) throws IOException { + synchronized (appenderMap) { + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId); + + if (partitionAppenderMap == null) { + partitionAppenderMap = new ConcurrentHashMap<Integer, PartitionAppenderMeta>(); + appenderMap.put(ebId, partitionAppenderMap); + } + + PartitionAppenderMeta partitionAppenderMeta = partitionAppenderMap.get(partId); + if (partitionAppenderMeta == null) { + Path dataFile = getDataFile(ebId, partId); + FileSystem fs = dataFile.getFileSystem(systemConf); + if (fs.exists(dataFile)) { + FileStatus status = fs.getFileStatus(dataFile); + LOG.info("File " + dataFile + " already exists, size=" + status.getLen()); + } + + if (!fs.exists(dataFile.getParent())) { + fs.mkdirs(dataFile.getParent()); + } + FileAppender appender = (FileAppender) StorageManagerFactory.getStorageManager( + tajoConf).getAppender(meta, outSchema, dataFile); + appender.enableStats(); + appender.init(); + + partitionAppenderMeta = new PartitionAppenderMeta(); + partitionAppenderMeta.partId = partId; + partitionAppenderMeta.dataFile = dataFile; + partitionAppenderMeta.appender = new HashShuffleAppender(ebId, partId, pageSize, appender); + partitionAppenderMeta.appender.init(); + partitionAppenderMap.put(partId, partitionAppenderMeta); + + LOG.info("Create Hash shuffle file(partId=" + partId + "): " + dataFile); + } + + return partitionAppenderMeta.appender; + } + } + + public static int getPartParentId(int partId, TajoConf tajoConf) { + return partId % tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS); — End diff – getPartParentId() method is used by TajoPullServerService.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user babokim commented on a diff in the pull request:

        https://github.com/apache/tajo/pull/115#discussion_r16340021

        — Diff: tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java —
        @@ -207,10 +208,13 @@ public void init(Configuration conf) {
        selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum);

        localFS = new LocalFileSystem();

        • super.init(new Configuration(conf));
          + //super.init(new Configuration(conf));
            • End diff –

        Yes

        Show
        githubbot ASF GitHub Bot added a comment - Github user babokim commented on a diff in the pull request: https://github.com/apache/tajo/pull/115#discussion_r16340021 — Diff: tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java — @@ -207,10 +208,13 @@ public void init(Configuration conf) { selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum); localFS = new LocalFileSystem(); super.init(new Configuration(conf)); + //super.init(new Configuration(conf)); End diff – Yes
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user babokim commented on a diff in the pull request:

        https://github.com/apache/tajo/pull/115#discussion_r16340074

        — Diff: tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java —
        @@ -0,0 +1,240 @@
        +/**
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.tajo.storage;
        +
        +import org.apache.commons.logging.Log;
        +import org.apache.commons.logging.LogFactory;
        +import org.apache.hadoop.fs.FileStatus;
        +import org.apache.hadoop.fs.FileSystem;
        +import org.apache.hadoop.fs.LocalDirAllocator;
        +import org.apache.hadoop.fs.Path;
        +import org.apache.tajo.ExecutionBlockId;
        +import org.apache.tajo.QueryUnitAttemptId;
        +import org.apache.tajo.catalog.Schema;
        +import org.apache.tajo.catalog.TableMeta;
        +import org.apache.tajo.conf.TajoConf;
        +import org.apache.tajo.conf.TajoConf.ConfVars;
        +import org.apache.tajo.util.Pair;
        +
        +import java.io.IOException;
        +import java.util.ArrayList;
        +import java.util.Collection;
        +import java.util.List;
        +import java.util.Map;
        +import java.util.concurrent.ConcurrentHashMap;
        +
        +public class HashShuffleAppenderManager {
        + private static final Log LOG = LogFactory.getLog(HashShuffleAppenderManager.class);
        +
        + private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> appenderMap =
        + new ConcurrentHashMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>>();
        + private TajoConf systemConf;
        + private FileSystem defaultFS;
        + private FileSystem localFS;
        + private LocalDirAllocator lDirAllocator;
        + private int pageSize;
        +
        + public HashShuffleAppenderManager(TajoConf systemConf) throws IOException

        { + this.systemConf = systemConf; + + // initialize LocalDirAllocator + lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); + + // initialize DFS and LocalFileSystems + defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); + localFS = FileSystem.getLocal(systemConf); + pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024; + }

        +
        + public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId,
        + TableMeta meta, Schema outSchema) throws IOException {
        + synchronized (appenderMap) {
        + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId);
        +
        + if (partitionAppenderMap == null)

        { + partitionAppenderMap = new ConcurrentHashMap<Integer, PartitionAppenderMeta>(); + appenderMap.put(ebId, partitionAppenderMap); + }

        +
        + PartitionAppenderMeta partitionAppenderMeta = partitionAppenderMap.get(partId);
        + if (partitionAppenderMeta == null) {
        + Path dataFile = getDataFile(ebId, partId);
        + FileSystem fs = dataFile.getFileSystem(systemConf);
        + if (fs.exists(dataFile))

        { + FileStatus status = fs.getFileStatus(dataFile); + LOG.info("File " + dataFile + " already exists, size=" + status.getLen()); + }

        +
        + if (!fs.exists(dataFile.getParent()))

        { + fs.mkdirs(dataFile.getParent()); + }

        + FileAppender appender = (FileAppender) StorageManagerFactory.getStorageManager(
        + tajoConf).getAppender(meta, outSchema, dataFile);
        + appender.enableStats();
        + appender.init();
        +
        + partitionAppenderMeta = new PartitionAppenderMeta();
        + partitionAppenderMeta.partId = partId;
        + partitionAppenderMeta.dataFile = dataFile;
        + partitionAppenderMeta.appender = new HashShuffleAppender(ebId, partId, pageSize, appender);
        + partitionAppenderMeta.appender.init();
        + partitionAppenderMap.put(partId, partitionAppenderMeta);
        +
        + LOG.info("Create Hash shuffle file(partId=" + partId + "): " + dataFile);
        + }
        +
        + return partitionAppenderMeta.appender;
        + }
        + }
        +
        + public static int getPartParentId(int partId, TajoConf tajoConf)

        { + return partId % tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS); + }

        +
        + private Path getDataFile(ExecutionBlockId ebId, int partId) throws IOException {
        + try

        { + // the base dir for an output dir + String executionBlockBaseDir = ebId.getQueryId().toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle"; + Path baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(executionBlockBaseDir, systemConf)); + //LOG.info(ebId + "'s basedir is created (" + baseDirPath + ")"); + + // If EB has many partition, too many shuffle file are in single directory. + return StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, systemConf), "" + partId); + }

        catch (Exception e)

        { + LOG.error(e.getMessage(), e); + throw new IOException(e); + }

        + }
        +
        + public Path getPartitionAppenderDataFile(ExecutionBlockId ebId, int partId) {
        + synchronized (appenderMap) {
        + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId);
        + if (partitionAppenderMap != null) {
        + PartitionAppenderMeta meta = partitionAppenderMap.get(partId);
        + if (meta != null)

        { + return meta.dataFile; + }

        + }
        + }
        +
        + LOG.warn("Can't find HashShuffleAppender:" + ebId + ", part=" + partId);
        + return null;
        + }
        +
        + public List<HashShuffleIntermediate> close(ExecutionBlockId ebId) throws IOException {
        + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = null;
        + synchronized (appenderMap)

        { + partitionAppenderMap = appenderMap.remove(ebId); + }

        +
        + if (partitionAppenderMap == null)

        { + LOG.info("Close HashShuffleAppender:" + ebId + ", not a hash shuffle"); + return null; + }

        +
        + // Send Intermediate data to QueryMaster.
        + List<HashShuffleIntermediate> intermEntries = new ArrayList<HashShuffleIntermediate>();
        + for (PartitionAppenderMeta eachMeta : partitionAppenderMap.values()) {
        + try

        { + eachMeta.appender.close(); + HashShuffleIntermediate intermediate = + new HashShuffleIntermediate(eachMeta.partId, eachMeta.appender.getOffset(), + eachMeta.appender.getPages(), + eachMeta.appender.getMergedTupleIndexes()); + intermEntries.add(intermediate); + }

        catch (IOException e)

        { + LOG.error(e.getMessage(), e); + throw e; + }

        + }
        +
        + LOG.info("Close HashShuffleAppender:" + ebId + ", intermediates=" + intermEntries.size());
        +
        + return intermEntries;
        + }
        +
        + public void taskFinished(QueryUnitAttemptId taskId) {
        — End diff –

        I'll change.

        Show
        githubbot ASF GitHub Bot added a comment - Github user babokim commented on a diff in the pull request: https://github.com/apache/tajo/pull/115#discussion_r16340074 — Diff: tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java — @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.util.Pair; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class HashShuffleAppenderManager { + private static final Log LOG = LogFactory.getLog(HashShuffleAppenderManager.class); + + private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> appenderMap = + new ConcurrentHashMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>>(); + private TajoConf systemConf; + private FileSystem defaultFS; + private FileSystem localFS; + private LocalDirAllocator lDirAllocator; + private int pageSize; + + public HashShuffleAppenderManager(TajoConf systemConf) throws IOException { + this.systemConf = systemConf; + + // initialize LocalDirAllocator + lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); + + // initialize DFS and LocalFileSystems + defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); + localFS = FileSystem.getLocal(systemConf); + pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024; + } + + public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId, + TableMeta meta, Schema outSchema) throws IOException { + synchronized (appenderMap) { + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId); + + if (partitionAppenderMap == null) { + partitionAppenderMap = new ConcurrentHashMap<Integer, PartitionAppenderMeta>(); + appenderMap.put(ebId, partitionAppenderMap); + } + + PartitionAppenderMeta partitionAppenderMeta = partitionAppenderMap.get(partId); + if (partitionAppenderMeta == null) { + Path dataFile = getDataFile(ebId, partId); + FileSystem fs = dataFile.getFileSystem(systemConf); + if (fs.exists(dataFile)) { + FileStatus status = fs.getFileStatus(dataFile); + LOG.info("File " + dataFile + " already exists, size=" + status.getLen()); + } + + if (!fs.exists(dataFile.getParent())) { + fs.mkdirs(dataFile.getParent()); + } + FileAppender appender = (FileAppender) StorageManagerFactory.getStorageManager( + tajoConf).getAppender(meta, outSchema, dataFile); + appender.enableStats(); + appender.init(); + + partitionAppenderMeta = new PartitionAppenderMeta(); + partitionAppenderMeta.partId = partId; + partitionAppenderMeta.dataFile = dataFile; + partitionAppenderMeta.appender = new HashShuffleAppender(ebId, partId, pageSize, appender); + partitionAppenderMeta.appender.init(); + partitionAppenderMap.put(partId, partitionAppenderMeta); + + LOG.info("Create Hash shuffle file(partId=" + partId + "): " + dataFile); + } + + return partitionAppenderMeta.appender; + } + } + + public static int getPartParentId(int partId, TajoConf tajoConf) { + return partId % tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS); + } + + private Path getDataFile(ExecutionBlockId ebId, int partId) throws IOException { + try { + // the base dir for an output dir + String executionBlockBaseDir = ebId.getQueryId().toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle"; + Path baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(executionBlockBaseDir, systemConf)); + //LOG.info(ebId + "'s basedir is created (" + baseDirPath + ")"); + + // If EB has many partition, too many shuffle file are in single directory. + return StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, systemConf), "" + partId); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new IOException(e); + } + } + + public Path getPartitionAppenderDataFile(ExecutionBlockId ebId, int partId) { + synchronized (appenderMap) { + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId); + if (partitionAppenderMap != null) { + PartitionAppenderMeta meta = partitionAppenderMap.get(partId); + if (meta != null) { + return meta.dataFile; + } + } + } + + LOG.warn("Can't find HashShuffleAppender:" + ebId + ", part=" + partId); + return null; + } + + public List<HashShuffleIntermediate> close(ExecutionBlockId ebId) throws IOException { + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = null; + synchronized (appenderMap) { + partitionAppenderMap = appenderMap.remove(ebId); + } + + if (partitionAppenderMap == null) { + LOG.info("Close HashShuffleAppender:" + ebId + ", not a hash shuffle"); + return null; + } + + // Send Intermediate data to QueryMaster. + List<HashShuffleIntermediate> intermEntries = new ArrayList<HashShuffleIntermediate>(); + for (PartitionAppenderMeta eachMeta : partitionAppenderMap.values()) { + try { + eachMeta.appender.close(); + HashShuffleIntermediate intermediate = + new HashShuffleIntermediate(eachMeta.partId, eachMeta.appender.getOffset(), + eachMeta.appender.getPages(), + eachMeta.appender.getMergedTupleIndexes()); + intermEntries.add(intermediate); + } catch (IOException e) { + LOG.error(e.getMessage(), e); + throw e; + } + } + + LOG.info("Close HashShuffleAppender:" + ebId + ", intermediates=" + intermEntries.size()); + + return intermEntries; + } + + public void taskFinished(QueryUnitAttemptId taskId) { — End diff – I'll change.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user babokim commented on the pull request:

        https://github.com/apache/tajo/pull/115#issuecomment-52619426

        Thank you for your comment. I've reflected your comment.

        Show
        githubbot ASF GitHub Bot added a comment - Github user babokim commented on the pull request: https://github.com/apache/tajo/pull/115#issuecomment-52619426 Thank you for your comment. I've reflected your comment.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user hyunsik commented on the pull request:

        https://github.com/apache/tajo/pull/115#issuecomment-52751304

        Could you push your recent work? You may not push your work.

        Show
        githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on the pull request: https://github.com/apache/tajo/pull/115#issuecomment-52751304 Could you push your recent work? You may not push your work.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user hyunsik commented on a diff in the pull request:

        https://github.com/apache/tajo/pull/115#discussion_r16465623

        — Diff: tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java —
        @@ -923,20 +960,62 @@ public static void scheduleScatteredHashShuffleFetches(TaskSchedulerContext sche

        LOG.info(subQuery.getId()
        + ", ShuffleType:" + SCATTERED_HASH_SHUFFLE.name()

        • + ", DeterminedTaskNum : " + fetches.size());
          + + ", Intermediate Size: " + totalIntermediateSize
          + + ", splitSize: " + splitVolume
          + + ", DeterminedTaskNum: " + fetches.size());
          }
        • static class IntermediateEntryComparator implements Comparator<IntermediateEntry> {
          + /**
          + * If a IntermediateEntry is large than splitVolume, List<FetchImpl> has single element.
          + * @param ebId
          + * @param entries
          + * @param splitVolume
          + * @return
          + */
          + public static List<List<FetchImpl>> splitOrMergeIntermediates(
          + ExecutionBlockId ebId, List<IntermediateEntry> entries, long splitVolume, long pageSize) {
          + // Each List<FetchImpl> has splitVolume size.
          + List<List<FetchImpl>> fetches = new ArrayList<List<FetchImpl>>();
          +
          + Iterator<IntermediateEntry> iter = entries.iterator();
          + if (!iter.hasNext()) { + return null; + }

          + List<FetchImpl> fetchListForSingleTask = new ArrayList<FetchImpl>();
          + long fetchListVolume = 0;

        • @Override
        • public int compare(IntermediateEntry o1, IntermediateEntry o2) {
        • int cmp = Ints.compare(o1.getPartId(), o2.getPartId());
        • if (cmp != 0) {
        • return cmp;
          + while (iter.hasNext()) {
          + IntermediateEntry currentInterm = iter.next();
          +
          + long firstSplitVolume = splitVolume - fetchListVolume;
          + if (firstSplitVolume < pageSize) { + firstSplitVolume = splitVolume; + }

          + List<Pair<Long, Long>> splits = currentInterm.split(firstSplitVolume, splitVolume);

            • End diff –

        Thank you for your comment. If you are Ok, please add the explanation to the source code for someone who will read the code in the future.

        Show
        githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on a diff in the pull request: https://github.com/apache/tajo/pull/115#discussion_r16465623 — Diff: tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java — @@ -923,20 +960,62 @@ public static void scheduleScatteredHashShuffleFetches(TaskSchedulerContext sche LOG.info(subQuery.getId() + ", ShuffleType:" + SCATTERED_HASH_SHUFFLE.name() + ", DeterminedTaskNum : " + fetches.size()); + + ", Intermediate Size: " + totalIntermediateSize + + ", splitSize: " + splitVolume + + ", DeterminedTaskNum: " + fetches.size()); } static class IntermediateEntryComparator implements Comparator<IntermediateEntry> { + /** + * If a IntermediateEntry is large than splitVolume, List<FetchImpl> has single element. + * @param ebId + * @param entries + * @param splitVolume + * @return + */ + public static List<List<FetchImpl>> splitOrMergeIntermediates( + ExecutionBlockId ebId, List<IntermediateEntry> entries, long splitVolume, long pageSize) { + // Each List<FetchImpl> has splitVolume size. + List<List<FetchImpl>> fetches = new ArrayList<List<FetchImpl>>(); + + Iterator<IntermediateEntry> iter = entries.iterator(); + if (!iter.hasNext()) { + return null; + } + List<FetchImpl> fetchListForSingleTask = new ArrayList<FetchImpl>(); + long fetchListVolume = 0; @Override public int compare(IntermediateEntry o1, IntermediateEntry o2) { int cmp = Ints.compare(o1.getPartId(), o2.getPartId()); if (cmp != 0) { return cmp; + while (iter.hasNext()) { + IntermediateEntry currentInterm = iter.next(); + + long firstSplitVolume = splitVolume - fetchListVolume; + if (firstSplitVolume < pageSize) { + firstSplitVolume = splitVolume; + } + List<Pair<Long, Long>> splits = currentInterm.split(firstSplitVolume, splitVolume); End diff – Thank you for your comment. If you are Ok, please add the explanation to the source code for someone who will read the code in the future.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user hyunsik commented on the pull request:

        https://github.com/apache/tajo/pull/115#issuecomment-52815070

        +1

        Even though Travis CI shows failure, it seems to be not related to the patch. I manually verified 'mvn clean install'. It works well and pass all unit tests.

        Also, the patch looks nice to me. Ship it

        Show
        githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on the pull request: https://github.com/apache/tajo/pull/115#issuecomment-52815070 +1 Even though Travis CI shows failure, it seems to be not related to the patch. I manually verified 'mvn clean install'. It works well and pass all unit tests. Also, the patch looks nice to me. Ship it
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user asfgit closed the pull request at:

        https://github.com/apache/tajo/pull/115

        Show
        githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/tajo/pull/115
        Hide
        hjkim Hyoungjun Kim added a comment -

        Committed.

        Show
        hjkim Hyoungjun Kim added a comment - Committed.

          People

          • Assignee:
            hjkim Hyoungjun Kim
            Reporter:
            hjkim Hyoungjun Kim
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development