diff --git beeline/pom.xml beeline/pom.xml index a720d08..eaada72 100644 --- beeline/pom.xml +++ beeline/pom.xml @@ -124,6 +124,11 @@ 9.1-901.jdbc4 test + + org.mockito + mockito-all + test + diff --git beeline/src/java/org/apache/hive/beeline/BeeLineOpts.java beeline/src/java/org/apache/hive/beeline/BeeLineOpts.java index 5aaa385..a8427bf 100644 --- beeline/src/java/org/apache/hive/beeline/BeeLineOpts.java +++ beeline/src/java/org/apache/hive/beeline/BeeLineOpts.java @@ -58,6 +58,7 @@ PROPERTY_PREFIX + "system.exit"; public static final String DEFAULT_NULL_STRING = "NULL"; public static final char DEFAULT_DELIMITER_FOR_DSV = '|'; + public static final int DEFAULT_INCREMENTAL_BUFFER_ROWS = 1000; public static String URL_ENV_PREFIX = "BEELINE_URL_"; @@ -72,6 +73,7 @@ private boolean verbose = false; private boolean force = false; private boolean incremental = false; + private int incrementalBufferRows = DEFAULT_INCREMENTAL_BUFFER_ROWS; private boolean showWarnings = false; private boolean showNestedErrs = false; private boolean showElapsedTime = true; @@ -491,6 +493,14 @@ public boolean getIncremental() { return incremental; } + public void setIncrementalBufferRows(int incrementalBufferRows) { + this.incrementalBufferRows = incrementalBufferRows; + } + + public int getIncrementalBufferRows() { + return this.incrementalBufferRows; + } + public void setSilent(boolean silent) { this.silent = silent; } diff --git beeline/src/java/org/apache/hive/beeline/BufferedRows.java beeline/src/java/org/apache/hive/beeline/BufferedRows.java index 962c531..2049476 100644 --- beeline/src/java/org/apache/hive/beeline/BufferedRows.java +++ beeline/src/java/org/apache/hive/beeline/BufferedRows.java @@ -27,6 +27,9 @@ import java.util.Iterator; import java.util.LinkedList; +import com.google.common.base.Optional; + + /** * Rows implementation which buffers all rows in a linked list. */ @@ -35,20 +38,35 @@ private final Iterator iterator; BufferedRows(BeeLine beeLine, ResultSet rs) throws SQLException { + this(beeLine, rs, Optional. absent()); + } + + BufferedRows(BeeLine beeLine, ResultSet rs, Optional limit) throws SQLException { super(beeLine, rs); list = new LinkedList(); int count = rsMeta.getColumnCount(); list.add(new Row(count)); - while (rs.next()) { - list.add(new Row(count, rs)); + + int numRowsBuffered = 0; + if (limit.isPresent()) { + while (limit.get() > numRowsBuffered && rs.next()) { + this.list.add(new Row(count, rs)); + numRowsBuffered++; + } + } else { + while (rs.next()) { + this.list.add(new Row(count, rs)); + } } iterator = list.iterator(); } + @Override public boolean hasNext() { return iterator.hasNext(); } + @Override public Object next() { return iterator.next(); } @@ -73,5 +91,4 @@ void normalizeWidths() { row.sizes = max; } } - } diff --git beeline/src/java/org/apache/hive/beeline/IncrementalRows.java beeline/src/java/org/apache/hive/beeline/IncrementalRows.java index 8aef976..81821cb 100644 --- beeline/src/java/org/apache/hive/beeline/IncrementalRows.java +++ beeline/src/java/org/apache/hive/beeline/IncrementalRows.java @@ -26,22 +26,33 @@ import java.sql.SQLException; import java.util.NoSuchElementException; +import com.google.common.base.Optional; + + /** - * Rows implementation which returns rows incrementally from result set - * without any buffering. + * Rows implementation which returns rows incrementally from a {@link ResultSet}. This class is only + * used if --incremental=true. The class will buffer "x" number of rows in memory at a + * time. It uses the {@link BufferedRows} class to do its buffering. The value of "x" is determined + * by the Beeline option --incrementalBufferRows, which defaults to + * {@link BeeLineOpts#DEFAULT_INCREMENTAL_BUFFER_ROWS}. Once the initial set of rows are buffered, it + * will allow the {@link #next()} method to drain the buffer. Once the buffer is empty the next + * buffer will be fetched until the {@link ResultSet} is empty. The width of the rows are normalized + * within each buffer using the {@link BufferedRows#normalizeWidths()} method. */ public class IncrementalRows extends Rows { + private final ResultSet rs; private final Row labelRow; private final Row maxRow; - private Row nextRow; - private boolean endOfResult; + private final int incrementalBufferRows; private boolean normalizingWidths; - + private BufferedRows buffer; IncrementalRows(BeeLine beeLine, ResultSet rs) throws SQLException { super(beeLine, rs); + this.rs = rs; + this.incrementalBufferRows = beeLine.getOpts().getIncrementalBufferRows(); labelRow = new Row(rsMeta.getColumnCount()); maxRow = new Row(rsMeta.getColumnCount()); @@ -58,42 +69,41 @@ maxRow.sizes[i] = Math.min(maxWidth, maxRow.sizes[i]); } - nextRow = labelRow; - endOfResult = false; + this.buffer = new BufferedRows(beeLine, rs, Optional.of(this.incrementalBufferRows)); + this.buffer.normalizeWidths(); } - + @Override public boolean hasNext() { - if (endOfResult) { - return false; - } + try { + if (this.buffer.hasNext()) { + return true; + } else { + this.buffer = new BufferedRows(this.beeLine, this.rs, + Optional.of(this.incrementalBufferRows)); + if (this.normalizingWidths) { + this.buffer.normalizeWidths(); + } - if (nextRow == null) { - try { - if (rs.next()) { - nextRow = new Row(labelRow.sizes.length, rs); - - if (normalizingWidths) { - // perform incremental normalization - nextRow.sizes = labelRow.sizes; - } - } else { - endOfResult = true; + // Drain the first Row, which just contains column names + if (!this.buffer.hasNext()) { + return false; } - } catch (SQLException ex) { - throw new RuntimeException(ex.toString()); + this.buffer.next(); + + return this.buffer.hasNext(); } + } catch (SQLException ex) { + throw new RuntimeException(ex.toString()); } - return (nextRow != null); } + @Override public Object next() { if (!hasNext()) { throw new NoSuchElementException(); } - Object ret = nextRow; - nextRow = null; - return ret; + return this.buffer.next(); } @Override diff --git beeline/src/java/org/apache/hive/beeline/Rows.java beeline/src/java/org/apache/hive/beeline/Rows.java index 453f685..924b951 100644 --- beeline/src/java/org/apache/hive/beeline/Rows.java +++ beeline/src/java/org/apache/hive/beeline/Rows.java @@ -35,7 +35,7 @@ * Holds column values as strings */ abstract class Rows implements Iterator { - private final BeeLine beeLine; + protected final BeeLine beeLine; final ResultSetMetaData rsMeta; final Boolean[] primaryKeys; final NumberFormat numberFormat; diff --git beeline/src/main/resources/BeeLine.properties beeline/src/main/resources/BeeLine.properties index 7500df9..7602059 100644 --- beeline/src/main/resources/BeeLine.properties +++ beeline/src/main/resources/BeeLine.properties @@ -183,6 +183,8 @@ cmd-usage: Usage: java org.apache.hive.cli.beeline.BeeLine \n \ \ memory usage at the price of extra display column padding.\n \ \ Setting --incremental=true is recommended if you encounter an OutOfMemory\n \ \ on the client side (due to the fetched result set size being large).\n \ +\ --incrementalBufferRows=NUMROWS the number of rows to buffer when printing rows on stdout,\n \ +\ defaults to 1000; only applicable if --incremental=true\n \ \ --truncateTable=[true/false] truncate table column when it exceeds length\n \ \ --delimiterForDSV=DELIMITER specify the delimiter for delimiter-separated values output format (default: |)\n \ \ --isolation=LEVEL set the transaction isolation level\n \ diff --git beeline/src/test/org/apache/hive/beeline/TestIncrementalRows.java beeline/src/test/org/apache/hive/beeline/TestIncrementalRows.java new file mode 100644 index 0000000..f03303f --- /dev/null +++ beeline/src/test/org/apache/hive/beeline/TestIncrementalRows.java @@ -0,0 +1,71 @@ +package org.apache.hive.beeline; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +import org.junit.Test; + +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + + +public class TestIncrementalRows { + + @Test + public void testIncrementalRows() throws SQLException { + Integer incrementalBufferRows = 5; + + // Mock BeeLineOpts + BeeLineOpts mockBeeLineOpts = mock(BeeLineOpts.class); + when(mockBeeLineOpts.getIncrementalBufferRows()).thenReturn(incrementalBufferRows); + when(mockBeeLineOpts.getMaxColumnWidth()).thenReturn(BeeLineOpts.DEFAULT_MAX_WIDTH); + when(mockBeeLineOpts.getNumberFormat()).thenReturn("default"); + when(mockBeeLineOpts.getNullString()).thenReturn("NULL"); + + // Mock BeeLine + BeeLine mockBeeline = mock(BeeLine.class); + when(mockBeeline.getOpts()).thenReturn(mockBeeLineOpts); + + // MockResultSet + ResultSet mockResultSet = mock(ResultSet.class); + + ResultSetMetaData mockResultSetMetaData = mock(ResultSetMetaData.class); + when(mockResultSetMetaData.getColumnCount()).thenReturn(1); + when(mockResultSetMetaData.getColumnLabel(1)).thenReturn("Mock Table"); + when(mockResultSet.getMetaData()).thenReturn(mockResultSetMetaData); + + // First 10 calls to resultSet.next() should return true + when(mockResultSet.next()).thenAnswer(new Answer() { + private int iterations = 10; + + @Override + public Boolean answer(InvocationOnMock invocation) { + return this.iterations-- > 0; + } + }); + + when(mockResultSet.getString(1)).thenReturn("Hello World"); + + // IncrementalRows constructor should buffer the first "incrementalBufferRows" rows + IncrementalRows incrementalRows = new IncrementalRows(mockBeeline, mockResultSet); + + // When the first buffer is loaded ResultSet.next() should be called "incrementalBufferRows" times + verify(mockResultSet, times(5)).next(); + + // Iterating through the buffer should not cause the next buffer to be fetched + for (int i = 0; i < incrementalBufferRows + 1; i++) { + incrementalRows.next(); + } + verify(mockResultSet, times(5)).next(); + + // When a new buffer is fetched ResultSet.next() should be called "incrementalBufferRows" more times + incrementalRows.next(); + verify(mockResultSet, times(10)).next(); + } +}