Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.9.0
    • Component/s: Worker
    • Labels:
      None

      Description

      • add ExecutionBlock start/stop event
      • add shareable context of ExecutionBlock

        Activity

        Hide
        githubbot ASF GitHub Bot added a comment -

        GitHub user jinossy opened a pull request:

        https://github.com/apache/tajo/pull/124

        TAJO-1015: Add executionblock event in worker

        add ExecutionBlock start/stop event
        add shareable context of ExecutionBlock

        You can merge this pull request into a Git repository by running:

        $ git pull https://github.com/jinossy/tajo TAJO-1015

        Alternatively you can review and apply these changes as the patch at:

        https://github.com/apache/tajo/pull/124.patch

        To close this pull request, make a commit to your master/trunk branch
        with (at least) the following in the commit message:

        This closes #124


        commit 0371386e9a296b5e343f1d38e2d45677e9d982d9
        Author: jhkim <jhkim@apache.org>
        Date: 2014-08-22T11:42:32Z

        TAJO-1015: Add executionblock event in worker.

        commit 26dbf81c1ecd065eb4612863bedd76972958fae7
        Author: jhkim <jhkim@apache.org>
        Date: 2014-08-25T04:41:24Z

        Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into TAJO-1015

        Conflicts:
        tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
        tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
        tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
        tajo-core/src/main/java/org/apache/tajo/worker/Task.java

        commit 08892028322764db5dac045bce9ca482760cb0d1
        Author: jhkim <jhkim@apache.org>
        Date: 2014-08-25T05:41:37Z

        fixed unit test failure


        Show
        githubbot ASF GitHub Bot added a comment - GitHub user jinossy opened a pull request: https://github.com/apache/tajo/pull/124 TAJO-1015 : Add executionblock event in worker add ExecutionBlock start/stop event add shareable context of ExecutionBlock You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinossy/tajo TAJO-1015 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/tajo/pull/124.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #124 commit 0371386e9a296b5e343f1d38e2d45677e9d982d9 Author: jhkim <jhkim@apache.org> Date: 2014-08-22T11:42:32Z TAJO-1015 : Add executionblock event in worker. commit 26dbf81c1ecd065eb4612863bedd76972958fae7 Author: jhkim <jhkim@apache.org> Date: 2014-08-25T04:41:24Z Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into TAJO-1015 Conflicts: tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java tajo-core/src/main/java/org/apache/tajo/worker/Task.java commit 08892028322764db5dac045bce9ca482760cb0d1 Author: jhkim <jhkim@apache.org> Date: 2014-08-25T05:41:37Z fixed unit test failure
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user hyunsik commented on the pull request:

        https://github.com/apache/tajo/pull/124#issuecomment-55072610

        I'm sorry for late review. Could you rebase it against the latest revision? If so, I'll review it shortly.

        Show
        githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on the pull request: https://github.com/apache/tajo/pull/124#issuecomment-55072610 I'm sorry for late review. Could you rebase it against the latest revision? If so, I'll review it shortly.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user jinossy commented on the pull request:

        https://github.com/apache/tajo/pull/124#issuecomment-55211227

        I’ve rebase this issue

        Show
        githubbot ASF GitHub Bot added a comment - Github user jinossy commented on the pull request: https://github.com/apache/tajo/pull/124#issuecomment-55211227 I’ve rebase this issue
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user hyunsik commented on a diff in the pull request:

        https://github.com/apache/tajo/pull/124#discussion_r17517726

        — Diff: tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java —
        @@ -0,0 +1,449 @@
        +/**
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.tajo.worker;
        +
        +import com.google.common.collect.Lists;
        +import com.google.common.collect.Maps;
        +import org.apache.commons.logging.Log;
        +import org.apache.commons.logging.LogFactory;
        +import org.apache.hadoop.fs.FileSystem;
        +import org.apache.hadoop.fs.LocalDirAllocator;
        +import org.apache.hadoop.fs.Path;
        +import org.apache.hadoop.security.UserGroupInformation;
        +import org.apache.hadoop.util.ReflectionUtils;
        +import org.apache.tajo.ExecutionBlockId;
        +import org.apache.tajo.QueryUnitAttemptId;
        +import org.apache.tajo.TajoProtos;
        +import org.apache.tajo.conf.TajoConf;
        +import org.apache.tajo.engine.query.QueryContext;
        +import org.apache.tajo.ipc.QueryMasterProtocol;
        +import org.apache.tajo.rpc.NettyClientBase;
        +import org.apache.tajo.rpc.NullCallback;
        +import org.apache.tajo.rpc.RpcChannelFactory;
        +import org.apache.tajo.rpc.RpcConnectionPool;
        +import org.apache.tajo.storage.HashShuffleAppenderManager;
        +import org.apache.tajo.storage.StorageUtil;
        +import org.apache.tajo.util.Pair;
        +import org.apache.tajo.worker.event.TaskRunnerStartEvent;
        +import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
        +
        +import java.io.IOException;
        +import java.net.InetSocketAddress;
        +import java.util.ArrayList;
        +import java.util.List;
        +import java.util.Map;
        +import java.util.concurrent.ConcurrentMap;
        +import java.util.concurrent.atomic.AtomicBoolean;
        +import java.util.concurrent.atomic.AtomicInteger;
        +
        +import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
        +
        +public class ExecutionBlockContext {
        + /** class logger */
        + private static final Log LOG = LogFactory.getLog(ExecutionBlockContext.class);
        +
        + private TaskRunnerManager manager;
        + public AtomicInteger completedTasksNum = new AtomicInteger();
        + public AtomicInteger succeededTasksNum = new AtomicInteger();
        + public AtomicInteger killedTasksNum = new AtomicInteger();
        + public AtomicInteger failedTasksNum = new AtomicInteger();
        +
        + private ClientSocketChannelFactory channelFactory;
        + // for temporal or intermediate files
        + private FileSystem localFS;
        + // for input files
        + private FileSystem defaultFS;
        + private ExecutionBlockId executionBlockId;
        + private QueryContext queryContext;
        + private String plan;
        +
        + private ExecutionBlockSharedResource resource;
        +
        + private TajoQueryEngine queryEngine;
        + private RpcConnectionPool connPool;
        + private InetSocketAddress qmMasterAddr;
        + private TajoConf systemConf;
        + // for the doAs block
        + private UserGroupInformation taskOwner;
        +
        + private Reporter reporter;
        +
        + private AtomicBoolean stop = new AtomicBoolean();
        +
        + // It keeps all of the query unit attempts while a TaskRunner is running.
        + private final ConcurrentMap<QueryUnitAttemptId, Task> tasks = Maps.newConcurrentMap();
        +
        + private final ConcurrentMap<String, TaskRunnerHistory> histories = Maps.newConcurrentMap();
        +
        + public ExecutionBlockContext(TaskRunnerManager manager, TaskRunnerStartEvent event, InetSocketAddress queryMaster)
        + throws Throwable

        { + this.manager = manager; + this.executionBlockId = event.getExecutionBlockId(); + this.connPool = RpcConnectionPool.getPool(manager.getTajoConf()); + this.qmMasterAddr = queryMaster; + this.systemConf = manager.getTajoConf(); + this.reporter = new Reporter(); + this.defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); + this.localFS = FileSystem.getLocal(systemConf); + + // Setup QueryEngine according to the query plan + // Here, we can setup row-based query engine or columnar query engine. + this.queryEngine = new TajoQueryEngine(systemConf); + this.queryContext = event.getQueryContext(); + this.plan = event.getPlan(); + this.resource = new ExecutionBlockSharedResource(); + + init(); + }

        +
        + public void init() throws Throwable {
        +
        + LOG.info("Tajo Root Dir: " + systemConf.getVar(TajoConf.ConfVars.ROOT_DIR));
        + LOG.info("Worker Local Dir: " + systemConf.getVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR));
        +
        + LOG.info("QueryMaster Address:" + qmMasterAddr);
        +
        + UserGroupInformation.setConfiguration(systemConf);
        + // TODO - 'load credential' should be implemented
        + // Getting taskOwner
        + UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(systemConf.getVar(TajoConf.ConfVars.USERNAME));
        + //taskOwner.addToken(token);
        +
        + // initialize MasterWorkerProtocol as an actual task owner.
        +// this.client =
        — End diff –

        Could you remove the commented lines?

        Show
        githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on a diff in the pull request: https://github.com/apache/tajo/pull/124#discussion_r17517726 — Diff: tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java — @@ -0,0 +1,449 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TajoProtos; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.ipc.QueryMasterProtocol; +import org.apache.tajo.rpc.NettyClientBase; +import org.apache.tajo.rpc.NullCallback; +import org.apache.tajo.rpc.RpcChannelFactory; +import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.storage.HashShuffleAppenderManager; +import org.apache.tajo.storage.StorageUtil; +import org.apache.tajo.util.Pair; +import org.apache.tajo.worker.event.TaskRunnerStartEvent; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.tajo.ipc.TajoWorkerProtocol.*; + +public class ExecutionBlockContext { + /** class logger */ + private static final Log LOG = LogFactory.getLog(ExecutionBlockContext.class); + + private TaskRunnerManager manager; + public AtomicInteger completedTasksNum = new AtomicInteger(); + public AtomicInteger succeededTasksNum = new AtomicInteger(); + public AtomicInteger killedTasksNum = new AtomicInteger(); + public AtomicInteger failedTasksNum = new AtomicInteger(); + + private ClientSocketChannelFactory channelFactory; + // for temporal or intermediate files + private FileSystem localFS; + // for input files + private FileSystem defaultFS; + private ExecutionBlockId executionBlockId; + private QueryContext queryContext; + private String plan; + + private ExecutionBlockSharedResource resource; + + private TajoQueryEngine queryEngine; + private RpcConnectionPool connPool; + private InetSocketAddress qmMasterAddr; + private TajoConf systemConf; + // for the doAs block + private UserGroupInformation taskOwner; + + private Reporter reporter; + + private AtomicBoolean stop = new AtomicBoolean(); + + // It keeps all of the query unit attempts while a TaskRunner is running. + private final ConcurrentMap<QueryUnitAttemptId, Task> tasks = Maps.newConcurrentMap(); + + private final ConcurrentMap<String, TaskRunnerHistory> histories = Maps.newConcurrentMap(); + + public ExecutionBlockContext(TaskRunnerManager manager, TaskRunnerStartEvent event, InetSocketAddress queryMaster) + throws Throwable { + this.manager = manager; + this.executionBlockId = event.getExecutionBlockId(); + this.connPool = RpcConnectionPool.getPool(manager.getTajoConf()); + this.qmMasterAddr = queryMaster; + this.systemConf = manager.getTajoConf(); + this.reporter = new Reporter(); + this.defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); + this.localFS = FileSystem.getLocal(systemConf); + + // Setup QueryEngine according to the query plan + // Here, we can setup row-based query engine or columnar query engine. + this.queryEngine = new TajoQueryEngine(systemConf); + this.queryContext = event.getQueryContext(); + this.plan = event.getPlan(); + this.resource = new ExecutionBlockSharedResource(); + + init(); + } + + public void init() throws Throwable { + + LOG.info("Tajo Root Dir: " + systemConf.getVar(TajoConf.ConfVars.ROOT_DIR)); + LOG.info("Worker Local Dir: " + systemConf.getVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR)); + + LOG.info("QueryMaster Address:" + qmMasterAddr); + + UserGroupInformation.setConfiguration(systemConf); + // TODO - 'load credential' should be implemented + // Getting taskOwner + UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(systemConf.getVar(TajoConf.ConfVars.USERNAME)); + //taskOwner.addToken(token); + + // initialize MasterWorkerProtocol as an actual task owner. +// this.client = — End diff – Could you remove the commented lines?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user hyunsik commented on the pull request:

        https://github.com/apache/tajo/pull/124#issuecomment-55517948

        +1
        I agree with this proposal. It allows tasks to share one execution context, and it will improve readability and make logic simpler. Before committing it, please remove some commented liens.

        Show
        githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on the pull request: https://github.com/apache/tajo/pull/124#issuecomment-55517948 +1 I agree with this proposal. It allows tasks to share one execution context, and it will improve readability and make logic simpler. Before committing it, please remove some commented liens.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user jinossy commented on a diff in the pull request:

        https://github.com/apache/tajo/pull/124#discussion_r17531064

        — Diff: tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java —
        @@ -0,0 +1,449 @@
        +/**
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.tajo.worker;
        +
        +import com.google.common.collect.Lists;
        +import com.google.common.collect.Maps;
        +import org.apache.commons.logging.Log;
        +import org.apache.commons.logging.LogFactory;
        +import org.apache.hadoop.fs.FileSystem;
        +import org.apache.hadoop.fs.LocalDirAllocator;
        +import org.apache.hadoop.fs.Path;
        +import org.apache.hadoop.security.UserGroupInformation;
        +import org.apache.hadoop.util.ReflectionUtils;
        +import org.apache.tajo.ExecutionBlockId;
        +import org.apache.tajo.QueryUnitAttemptId;
        +import org.apache.tajo.TajoProtos;
        +import org.apache.tajo.conf.TajoConf;
        +import org.apache.tajo.engine.query.QueryContext;
        +import org.apache.tajo.ipc.QueryMasterProtocol;
        +import org.apache.tajo.rpc.NettyClientBase;
        +import org.apache.tajo.rpc.NullCallback;
        +import org.apache.tajo.rpc.RpcChannelFactory;
        +import org.apache.tajo.rpc.RpcConnectionPool;
        +import org.apache.tajo.storage.HashShuffleAppenderManager;
        +import org.apache.tajo.storage.StorageUtil;
        +import org.apache.tajo.util.Pair;
        +import org.apache.tajo.worker.event.TaskRunnerStartEvent;
        +import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
        +
        +import java.io.IOException;
        +import java.net.InetSocketAddress;
        +import java.util.ArrayList;
        +import java.util.List;
        +import java.util.Map;
        +import java.util.concurrent.ConcurrentMap;
        +import java.util.concurrent.atomic.AtomicBoolean;
        +import java.util.concurrent.atomic.AtomicInteger;
        +
        +import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
        +
        +public class ExecutionBlockContext {
        + /** class logger */
        + private static final Log LOG = LogFactory.getLog(ExecutionBlockContext.class);
        +
        + private TaskRunnerManager manager;
        + public AtomicInteger completedTasksNum = new AtomicInteger();
        + public AtomicInteger succeededTasksNum = new AtomicInteger();
        + public AtomicInteger killedTasksNum = new AtomicInteger();
        + public AtomicInteger failedTasksNum = new AtomicInteger();
        +
        + private ClientSocketChannelFactory channelFactory;
        + // for temporal or intermediate files
        + private FileSystem localFS;
        + // for input files
        + private FileSystem defaultFS;
        + private ExecutionBlockId executionBlockId;
        + private QueryContext queryContext;
        + private String plan;
        +
        + private ExecutionBlockSharedResource resource;
        +
        + private TajoQueryEngine queryEngine;
        + private RpcConnectionPool connPool;
        + private InetSocketAddress qmMasterAddr;
        + private TajoConf systemConf;
        + // for the doAs block
        + private UserGroupInformation taskOwner;
        +
        + private Reporter reporter;
        +
        + private AtomicBoolean stop = new AtomicBoolean();
        +
        + // It keeps all of the query unit attempts while a TaskRunner is running.
        + private final ConcurrentMap<QueryUnitAttemptId, Task> tasks = Maps.newConcurrentMap();
        +
        + private final ConcurrentMap<String, TaskRunnerHistory> histories = Maps.newConcurrentMap();
        +
        + public ExecutionBlockContext(TaskRunnerManager manager, TaskRunnerStartEvent event, InetSocketAddress queryMaster)
        + throws Throwable

        { + this.manager = manager; + this.executionBlockId = event.getExecutionBlockId(); + this.connPool = RpcConnectionPool.getPool(manager.getTajoConf()); + this.qmMasterAddr = queryMaster; + this.systemConf = manager.getTajoConf(); + this.reporter = new Reporter(); + this.defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); + this.localFS = FileSystem.getLocal(systemConf); + + // Setup QueryEngine according to the query plan + // Here, we can setup row-based query engine or columnar query engine. + this.queryEngine = new TajoQueryEngine(systemConf); + this.queryContext = event.getQueryContext(); + this.plan = event.getPlan(); + this.resource = new ExecutionBlockSharedResource(); + + init(); + }

        +
        + public void init() throws Throwable {
        +
        + LOG.info("Tajo Root Dir: " + systemConf.getVar(TajoConf.ConfVars.ROOT_DIR));
        + LOG.info("Worker Local Dir: " + systemConf.getVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR));
        +
        + LOG.info("QueryMaster Address:" + qmMasterAddr);
        +
        + UserGroupInformation.setConfiguration(systemConf);
        + // TODO - 'load credential' should be implemented
        + // Getting taskOwner
        + UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(systemConf.getVar(TajoConf.ConfVars.USERNAME));
        + //taskOwner.addToken(token);
        +
        + // initialize MasterWorkerProtocol as an actual task owner.
        +// this.client =
        — End diff –

        done!

        Show
        githubbot ASF GitHub Bot added a comment - Github user jinossy commented on a diff in the pull request: https://github.com/apache/tajo/pull/124#discussion_r17531064 — Diff: tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java — @@ -0,0 +1,449 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TajoProtos; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.ipc.QueryMasterProtocol; +import org.apache.tajo.rpc.NettyClientBase; +import org.apache.tajo.rpc.NullCallback; +import org.apache.tajo.rpc.RpcChannelFactory; +import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.storage.HashShuffleAppenderManager; +import org.apache.tajo.storage.StorageUtil; +import org.apache.tajo.util.Pair; +import org.apache.tajo.worker.event.TaskRunnerStartEvent; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.tajo.ipc.TajoWorkerProtocol.*; + +public class ExecutionBlockContext { + /** class logger */ + private static final Log LOG = LogFactory.getLog(ExecutionBlockContext.class); + + private TaskRunnerManager manager; + public AtomicInteger completedTasksNum = new AtomicInteger(); + public AtomicInteger succeededTasksNum = new AtomicInteger(); + public AtomicInteger killedTasksNum = new AtomicInteger(); + public AtomicInteger failedTasksNum = new AtomicInteger(); + + private ClientSocketChannelFactory channelFactory; + // for temporal or intermediate files + private FileSystem localFS; + // for input files + private FileSystem defaultFS; + private ExecutionBlockId executionBlockId; + private QueryContext queryContext; + private String plan; + + private ExecutionBlockSharedResource resource; + + private TajoQueryEngine queryEngine; + private RpcConnectionPool connPool; + private InetSocketAddress qmMasterAddr; + private TajoConf systemConf; + // for the doAs block + private UserGroupInformation taskOwner; + + private Reporter reporter; + + private AtomicBoolean stop = new AtomicBoolean(); + + // It keeps all of the query unit attempts while a TaskRunner is running. + private final ConcurrentMap<QueryUnitAttemptId, Task> tasks = Maps.newConcurrentMap(); + + private final ConcurrentMap<String, TaskRunnerHistory> histories = Maps.newConcurrentMap(); + + public ExecutionBlockContext(TaskRunnerManager manager, TaskRunnerStartEvent event, InetSocketAddress queryMaster) + throws Throwable { + this.manager = manager; + this.executionBlockId = event.getExecutionBlockId(); + this.connPool = RpcConnectionPool.getPool(manager.getTajoConf()); + this.qmMasterAddr = queryMaster; + this.systemConf = manager.getTajoConf(); + this.reporter = new Reporter(); + this.defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); + this.localFS = FileSystem.getLocal(systemConf); + + // Setup QueryEngine according to the query plan + // Here, we can setup row-based query engine or columnar query engine. + this.queryEngine = new TajoQueryEngine(systemConf); + this.queryContext = event.getQueryContext(); + this.plan = event.getPlan(); + this.resource = new ExecutionBlockSharedResource(); + + init(); + } + + public void init() throws Throwable { + + LOG.info("Tajo Root Dir: " + systemConf.getVar(TajoConf.ConfVars.ROOT_DIR)); + LOG.info("Worker Local Dir: " + systemConf.getVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR)); + + LOG.info("QueryMaster Address:" + qmMasterAddr); + + UserGroupInformation.setConfiguration(systemConf); + // TODO - 'load credential' should be implemented + // Getting taskOwner + UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(systemConf.getVar(TajoConf.ConfVars.USERNAME)); + //taskOwner.addToken(token); + + // initialize MasterWorkerProtocol as an actual task owner. +// this.client = — End diff – done!
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user asfgit closed the pull request at:

        https://github.com/apache/tajo/pull/124

        Show
        githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/tajo/pull/124
        Hide
        jhkim Jinho Kim added a comment -

        Thank you for the review. I've just committed it.

        Show
        jhkim Jinho Kim added a comment - Thank you for the review. I've just committed it.
        Hide
        hudson Hudson added a comment -

        SUCCESS: Integrated in Tajo-master-build #359 (See https://builds.apache.org/job/Tajo-master-build/359/)
        TAJO-1015: Add executionblock event in worker. (jinho) (jhkim: rev 15450e868ae6a985deb988e679d19e1364c95526)

        • tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
        • tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
        • tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
        • tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
        • tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java
        • tajo-core/src/main/proto/TajoWorkerProtocol.proto
        • tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
        • tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
        • tajo-core/src/main/proto/QueryMasterProtocol.proto
        • tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java
        • CHANGES
        • tajo-core/src/main/java/org/apache/tajo/worker/Task.java
        • tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
        • tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
        • tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
        • tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
        • tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
        • tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
        • tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
        • tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
        Show
        hudson Hudson added a comment - SUCCESS: Integrated in Tajo-master-build #359 (See https://builds.apache.org/job/Tajo-master-build/359/ ) TAJO-1015 : Add executionblock event in worker. (jinho) (jhkim: rev 15450e868ae6a985deb988e679d19e1364c95526) tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java tajo-core/src/main/proto/TajoWorkerProtocol.proto tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java tajo-core/src/main/proto/QueryMasterProtocol.proto tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java CHANGES tajo-core/src/main/java/org/apache/tajo/worker/Task.java tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java

          People

          • Assignee:
            jhkim Jinho Kim
            Reporter:
            jhkim Jinho Kim
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development