From fc6f7fa61ee4196b64ac4499e8b6b43e33516dee Mon Sep 17 00:00:00 2001 From: nareshpr Date: Thu, 7 May 2020 20:58:20 -0700 Subject: [PATCH] HIVE-23409 - If TezSession application reopen fails for Timeline service down, default TezSession from SessionPool is closed after a retry --- .../hadoop/hive/ql/exec/tez/TezTask.java | 12 +++-- .../hadoop/hive/ql/exec/tez/TestTezTask.java | 49 +++++++++++++++++++ 2 files changed, 57 insertions(+), 4 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index b1bf2f8903..3d276323f5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -573,7 +573,6 @@ DAGClient submit(DAG dag, Ref sessionStateRef) throws Exception dagClient = sessionState.getSession().submitDAG(dag); } catch (SessionNotRunning nr) { console.printInfo("Tez session was closed. Reopening..."); - sessionStateRef.value = null; sessionStateRef.value = sessionState = getNewTezSessionOnError(sessionState); console.printInfo("Session re-established."); dagClient = sessionState.getSession().submitDAG(dag); @@ -583,13 +582,18 @@ DAGClient submit(DAG dag, Ref sessionStateRef) throws Exception try { console.printInfo("Dag submit failed due to " + e.getMessage() + " stack trace: " + Arrays.toString(e.getStackTrace()) + " retrying..."); - sessionStateRef.value = null; sessionStateRef.value = sessionState = getNewTezSessionOnError(sessionState); dagClient = sessionState.getSession().submitDAG(dag); } catch (Exception retryException) { - // we failed to submit after retrying. Destroy session and bail. + // we failed to submit after retrying. + // If this is a non-pool session, destroy it. + // Otherwise move it to sessionPool, reopen will retry. sessionStateRef.value = null; - sessionState.destroy(); + if (sessionState.isDefault() && sessionState instanceof TezSessionPoolSession) { + sessionState.returnToSessionManager(); + } else { + sessionState.destroy(); + } throw retryException; } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index cdcac4581b..e5ae58c827 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -69,6 +69,7 @@ import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyMap; import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; @@ -231,6 +232,54 @@ public void testSubmit() throws Exception { verify(session, times(2)).submitDAG(any(DAG.class)); } + @Test + public void testSubmitOnNonPoolSession() throws Exception { + DAG dag = DAG.create("test"); + + // Destroy session incase of non-pool tez session + TezSessionState tezSessionState = mock(TezSessionState.class); + TezClient tezClient = mock(TezClient.class); + when(tezSessionState.reopen()).thenThrow(new HiveException("Dag cannot be submitted")); + when(tezSessionState.getSession()).thenReturn(tezClient); + when(tezClient.submitDAG(any(DAG.class))).thenThrow(new SessionNotRunning("")); + doNothing().when(tezSessionState).destroy(); + boolean isException = false; + try { + task.submit(dag, Ref.from(tezSessionState)); + } catch (Exception e) { + isException = true; + verify(tezClient, times(1)).submitDAG(any(DAG.class)); + verify(tezSessionState, times(2)).reopen(); + verify(tezSessionState, times(1)).destroy(); + verify(tezSessionState, times(0)).returnToSessionManager(); + } + assertTrue(isException); + } + + @Test + public void testSubmitOnPoolSession() throws Exception { + DAG dag = DAG.create("test"); + // Move session to TezSessionPool, reopen will handle it + SampleTezSessionState tezSessionPoolSession = mock(SampleTezSessionState.class); + TezClient tezClient = mock(TezClient.class); + when(tezSessionPoolSession.reopen()).thenThrow(new HiveException("Dag cannot be submitted")); + doNothing().when(tezSessionPoolSession).returnToSessionManager(); + when(tezSessionPoolSession.getSession()).thenReturn(tezClient); + when(tezSessionPoolSession.isDefault()).thenReturn(true); + when(tezClient.submitDAG(any(DAG.class))).thenThrow(new SessionNotRunning("")); + boolean isException = false; + try { + task.submit(dag, Ref.from(tezSessionPoolSession)); + } catch (Exception e) { + isException = true; + verify(tezClient, times(1)).submitDAG(any(DAG.class)); + verify(tezSessionPoolSession, times(2)).reopen(); + verify(tezSessionPoolSession, times(0)).destroy(); + verify(tezSessionPoolSession, times(1)).returnToSessionManager(); + } + assertTrue(isException); + } + @Test public void testClose() throws HiveException { task.close(work, 0, null); -- 2.20.1