diff --git pom.xml pom.xml
index b3a22b5..8d9dac7 100644
--- pom.xml
+++ pom.xml
@@ -154,7 +154,7 @@
4.0.4
0.5.2
2.2.0
- 1.2.0-SNAPSHOT
+ 1.2.1-SNAPSHOT
2.10
2.10.4
1.1
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
index 6217de4..828a3e3 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
@@ -32,8 +32,6 @@
import org.apache.hive.spark.client.JobContext;
import org.apache.hive.spark.client.JobHandle;
import org.apache.hive.spark.client.SparkClient;
-import org.apache.hive.spark.client.status.HiveSparkJobInfo;
-import org.apache.hive.spark.client.status.HiveSparkStageInfo;
import org.apache.spark.JobExecutionStatus;
import org.apache.spark.SparkJobInfo;
import org.apache.spark.SparkStageInfo;
@@ -158,7 +156,7 @@ public JobExecutionStatus status() {
};
}
}
- JobHandle getJobInfo = sparkClient.submit(
+ JobHandle getJobInfo = sparkClient.submit(
new GetJobInfoJob(jobHandle.getClientJobId(), sparkJobId));
try {
return getJobInfo.get();
@@ -169,7 +167,7 @@ public JobExecutionStatus status() {
}
private SparkStageInfo getSparkStageInfo(int stageId) {
- JobHandle getStageInfo = sparkClient.submit(new GetStageInfoJob(stageId));
+ JobHandle getStageInfo = sparkClient.submit(new GetStageInfoJob(stageId));
try {
return getStageInfo.get();
} catch (Throwable t) {
@@ -178,7 +176,7 @@ private SparkStageInfo getSparkStageInfo(int stageId) {
}
}
- private static class GetJobInfoJob implements Job {
+ private static class GetJobInfoJob implements Job {
private final String clientJobId;
private final int sparkJobId;
@@ -193,7 +191,7 @@ private GetJobInfoJob() {
}
@Override
- public HiveSparkJobInfo call(JobContext jc) throws Exception {
+ public SparkJobInfo call(JobContext jc) throws Exception {
SparkJobInfo jobInfo = jc.sc().statusTracker().getJobInfo(sparkJobId);
if (jobInfo == null) {
List> list = jc.getMonitoredJobs().get(clientJobId);
@@ -237,11 +235,11 @@ public JobExecutionStatus status() {
}
};
}
- return new HiveSparkJobInfo(jobInfo);
+ return jobInfo;
}
}
- private static class GetStageInfoJob implements Job {
+ private static class GetStageInfoJob implements Job {
private final int stageId;
private GetStageInfoJob() {
@@ -254,9 +252,8 @@ private GetStageInfoJob() {
}
@Override
- public HiveSparkStageInfo call(JobContext jc) throws Exception {
- SparkStageInfo stageInfo = jc.sc().statusTracker().getStageInfo(stageId);
- return stageInfo != null ? new HiveSparkStageInfo(stageInfo) : new HiveSparkStageInfo();
+ public SparkStageInfo call(JobContext jc) throws Exception {
+ return jc.sc().statusTracker().getStageInfo(stageId);
}
}
diff --git spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java
index 437d61d..0c29c94 100644
--- spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java
+++ spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java
@@ -26,6 +26,7 @@
import com.esotericsoftware.kryo.io.ByteBufferInputStream;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.shaded.org.objenesis.strategy.StdInstantiatorStrategy;
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
@@ -57,6 +58,7 @@ protected Kryo initialValue() {
kryo.register(klass, REG_ID_BASE + count);
count++;
}
+ kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
return kryo;
}
};
diff --git spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkJobInfo.java spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkJobInfo.java
deleted file mode 100644
index 8ea6969..0000000
--- spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkJobInfo.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.spark.client.status;
-
-import org.apache.spark.JobExecutionStatus;
-import org.apache.spark.SparkJobInfo;
-
-import java.io.Serializable;
-
-/**
- * Wrapper of SparkJobInfo
- */
-public class HiveSparkJobInfo implements SparkJobInfo, Serializable {
- private final int jobId;
- private final int[] stageIds;
- private final JobExecutionStatus status;
-
- public HiveSparkJobInfo(SparkJobInfo jobInfo) {
- this.jobId = jobInfo.jobId();
- this.stageIds = jobInfo.stageIds();
- this.status = jobInfo.status();
- }
-
- public HiveSparkJobInfo(int jobId, int[] stageIds, JobExecutionStatus status) {
- this.jobId = jobId;
- this.stageIds = stageIds;
- this.status = status;
- }
-
- public HiveSparkJobInfo() {
- this(-1, new int[0], JobExecutionStatus.UNKNOWN);
- }
-
- @Override
- public int jobId() {
- return jobId;
- }
-
- @Override
- public int[] stageIds() {
- return stageIds;
- }
-
- @Override
- public JobExecutionStatus status() {
- return status;
- }
-
-}
diff --git spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkStageInfo.java spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkStageInfo.java
deleted file mode 100644
index dfbb01e..0000000
--- spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkStageInfo.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.spark.client.status;
-
-import org.apache.spark.SparkStageInfo;
-
-import java.io.Serializable;
-
-/**
- * Wrapper of SparkStageInfo
- */
-public class HiveSparkStageInfo implements SparkStageInfo, Serializable {
- private final int stageId;
- private final int currentAttemptId;
- private final String name;
- private final int numTasks;
- private final int numActiveTasks;
- private final int numCompletedTasks;
- private final int numFailedTasks;
-
- public HiveSparkStageInfo(SparkStageInfo stageInfo) {
- stageId = stageInfo.stageId();
- currentAttemptId = stageInfo.currentAttemptId();
- name = stageInfo.name();
- numTasks = stageInfo.numTasks();
- numActiveTasks = stageInfo.numActiveTasks();
- numCompletedTasks = stageInfo.numCompletedTasks();
- numFailedTasks = stageInfo.numFailedTasks();
- }
-
- public HiveSparkStageInfo(int stageId, int currentAttemptId, String name,
- int numTasks, int numActiveTasks, int numCompletedTasks, int numFailedTasks) {
- this.stageId = stageId;
- this.currentAttemptId = currentAttemptId;
- this.name = name;
- this.numTasks = numTasks;
- this.numActiveTasks = numActiveTasks;
- this.numCompletedTasks = numCompletedTasks;
- this.numFailedTasks = numFailedTasks;
- }
-
- public HiveSparkStageInfo() {
- this(-1, -1, null, -1, -1, -1, -1);
- }
-
- @Override
- public int stageId() {
- return stageId;
- }
-
- @Override
- public int currentAttemptId() {
- return currentAttemptId;
- }
-
- @Override
- public String name() {
- return name;
- }
-
- @Override
- public int numTasks() {
- return numTasks;
- }
-
- @Override
- public int numActiveTasks() {
- return numActiveTasks;
- }
-
- @Override
- public int numCompletedTasks() {
- return numCompletedTasks;
- }
-
- @Override
- public int numFailedTasks() {
- return numFailedTasks;
- }
-
-}