diff --git pom.xml pom.xml index b3a22b5..8d9dac7 100644 --- pom.xml +++ pom.xml @@ -154,7 +154,7 @@ 4.0.4 0.5.2 2.2.0 - 1.2.0-SNAPSHOT + 1.2.1-SNAPSHOT 2.10 2.10.4 1.1 diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index 6217de4..828a3e3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -32,8 +32,6 @@ import org.apache.hive.spark.client.JobContext; import org.apache.hive.spark.client.JobHandle; import org.apache.hive.spark.client.SparkClient; -import org.apache.hive.spark.client.status.HiveSparkJobInfo; -import org.apache.hive.spark.client.status.HiveSparkStageInfo; import org.apache.spark.JobExecutionStatus; import org.apache.spark.SparkJobInfo; import org.apache.spark.SparkStageInfo; @@ -158,7 +156,7 @@ public JobExecutionStatus status() { }; } } - JobHandle getJobInfo = sparkClient.submit( + JobHandle getJobInfo = sparkClient.submit( new GetJobInfoJob(jobHandle.getClientJobId(), sparkJobId)); try { return getJobInfo.get(); @@ -169,7 +167,7 @@ public JobExecutionStatus status() { } private SparkStageInfo getSparkStageInfo(int stageId) { - JobHandle getStageInfo = sparkClient.submit(new GetStageInfoJob(stageId)); + JobHandle getStageInfo = sparkClient.submit(new GetStageInfoJob(stageId)); try { return getStageInfo.get(); } catch (Throwable t) { @@ -178,7 +176,7 @@ private SparkStageInfo getSparkStageInfo(int stageId) { } } - private static class GetJobInfoJob implements Job { + private static class GetJobInfoJob implements Job { private final String clientJobId; private final int sparkJobId; @@ -193,7 +191,7 @@ private GetJobInfoJob() { } @Override - public HiveSparkJobInfo call(JobContext jc) throws Exception { + public SparkJobInfo call(JobContext jc) throws Exception { SparkJobInfo jobInfo = jc.sc().statusTracker().getJobInfo(sparkJobId); if (jobInfo == null) { List> list = jc.getMonitoredJobs().get(clientJobId); @@ -237,11 +235,11 @@ public JobExecutionStatus status() { } }; } - return new HiveSparkJobInfo(jobInfo); + return jobInfo; } } - private static class GetStageInfoJob implements Job { + private static class GetStageInfoJob implements Job { private final int stageId; private GetStageInfoJob() { @@ -254,9 +252,8 @@ private GetStageInfoJob() { } @Override - public HiveSparkStageInfo call(JobContext jc) throws Exception { - SparkStageInfo stageInfo = jc.sc().statusTracker().getStageInfo(stageId); - return stageInfo != null ? new HiveSparkStageInfo(stageInfo) : new HiveSparkStageInfo(); + public SparkStageInfo call(JobContext jc) throws Exception { + return jc.sc().statusTracker().getStageInfo(stageId); } } diff --git spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java index 437d61d..0c29c94 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java +++ spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java @@ -26,6 +26,7 @@ import com.esotericsoftware.kryo.io.ByteBufferInputStream; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.shaded.org.objenesis.strategy.StdInstantiatorStrategy; import com.google.common.base.Preconditions; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; @@ -57,6 +58,7 @@ protected Kryo initialValue() { kryo.register(klass, REG_ID_BASE + count); count++; } + kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); return kryo; } }; diff --git spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkJobInfo.java spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkJobInfo.java deleted file mode 100644 index 8ea6969..0000000 --- spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkJobInfo.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hive.spark.client.status; - -import org.apache.spark.JobExecutionStatus; -import org.apache.spark.SparkJobInfo; - -import java.io.Serializable; - -/** - * Wrapper of SparkJobInfo - */ -public class HiveSparkJobInfo implements SparkJobInfo, Serializable { - private final int jobId; - private final int[] stageIds; - private final JobExecutionStatus status; - - public HiveSparkJobInfo(SparkJobInfo jobInfo) { - this.jobId = jobInfo.jobId(); - this.stageIds = jobInfo.stageIds(); - this.status = jobInfo.status(); - } - - public HiveSparkJobInfo(int jobId, int[] stageIds, JobExecutionStatus status) { - this.jobId = jobId; - this.stageIds = stageIds; - this.status = status; - } - - public HiveSparkJobInfo() { - this(-1, new int[0], JobExecutionStatus.UNKNOWN); - } - - @Override - public int jobId() { - return jobId; - } - - @Override - public int[] stageIds() { - return stageIds; - } - - @Override - public JobExecutionStatus status() { - return status; - } - -} diff --git spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkStageInfo.java spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkStageInfo.java deleted file mode 100644 index dfbb01e..0000000 --- spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkStageInfo.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hive.spark.client.status; - -import org.apache.spark.SparkStageInfo; - -import java.io.Serializable; - -/** - * Wrapper of SparkStageInfo - */ -public class HiveSparkStageInfo implements SparkStageInfo, Serializable { - private final int stageId; - private final int currentAttemptId; - private final String name; - private final int numTasks; - private final int numActiveTasks; - private final int numCompletedTasks; - private final int numFailedTasks; - - public HiveSparkStageInfo(SparkStageInfo stageInfo) { - stageId = stageInfo.stageId(); - currentAttemptId = stageInfo.currentAttemptId(); - name = stageInfo.name(); - numTasks = stageInfo.numTasks(); - numActiveTasks = stageInfo.numActiveTasks(); - numCompletedTasks = stageInfo.numCompletedTasks(); - numFailedTasks = stageInfo.numFailedTasks(); - } - - public HiveSparkStageInfo(int stageId, int currentAttemptId, String name, - int numTasks, int numActiveTasks, int numCompletedTasks, int numFailedTasks) { - this.stageId = stageId; - this.currentAttemptId = currentAttemptId; - this.name = name; - this.numTasks = numTasks; - this.numActiveTasks = numActiveTasks; - this.numCompletedTasks = numCompletedTasks; - this.numFailedTasks = numFailedTasks; - } - - public HiveSparkStageInfo() { - this(-1, -1, null, -1, -1, -1, -1); - } - - @Override - public int stageId() { - return stageId; - } - - @Override - public int currentAttemptId() { - return currentAttemptId; - } - - @Override - public String name() { - return name; - } - - @Override - public int numTasks() { - return numTasks; - } - - @Override - public int numActiveTasks() { - return numActiveTasks; - } - - @Override - public int numCompletedTasks() { - return numCompletedTasks; - } - - @Override - public int numFailedTasks() { - return numFailedTasks; - } - -}